blob: 71577ba1d8895402aac2cff57d04eb6a6c7fc2b5 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import sys
from time import sleep
import system_test
from system_test import TestCase, Qdrouterd, QdManager, Process, SkipIfNeeded
from subprocess import PIPE
def python_37_available():
if sys.version_info >= (3, 7):
return True
def curl_available():
popen_args = ['curl', '--version']
try:
process = Process(popen_args,
name='curl_check',
stdout=PIPE,
expect=None,
universal_newlines=True)
out = process.communicate()[0]
return True
except:
return False
def quart_available():
"""
Checks if quart version is greater than 0.13
"""
popen_args = ['quart', '--version']
try:
process = Process(popen_args,
name='quart_check',
stdout=PIPE,
expect=None,
universal_newlines=True)
out = process.communicate()[0]
parts = out.split(".")
major_version = parts[0]
if int(major_version[-1]) > 0 or int(parts[1]) >= 13:
return True
return False
except Exception as e:
print(e)
print("quart_not_available")
return False
def skip_test():
if python_37_available() and quart_available() and curl_available():
return False
return True
class Http2TestBase(TestCase):
def run_curl(self, args=None, regexp=None, timeout=system_test.TIMEOUT, address=None):
# Tell with -m / --max-time the maximum time, in seconds, that you
# allow the command line to spend before curl exits with a
# timeout error code (28).
local_args = ["--http2-prior-knowledge"]
if args:
local_args = args + ["--http2-prior-knowledge"]
popen_args = ['curl',
str(address),
'--max-time', str(timeout)] + local_args
p = self.popen(popen_args,
name='curl-' + self.id(), stdout=PIPE, expect=None,
universal_newlines=True)
out = p.communicate()[0]
assert (p.returncode == 0)
return out
class CommonHttp2Tests():
"""
Common Base class containing all tests. These tests are run by all
topologies of routers.
"""
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
# Tests the HTTP2 head request
def test_head_request(self):
# Run curl 127.0.0.1:port --http2-prior-knowledge --head
address = self.router_qdra.http_addresses[0]
out = self.run_curl(args=["--head"], address=address)
self.assertIn('HTTP/2 200', out)
self.assertIn('server: hypercorn-h2', out)
self.assertIn('content-type: text/html; charset=utf-8', out)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_get_request(self):
# Run curl 127.0.0.1:port --http2-prior-knowledge
address = self.router_qdra.http_addresses[0]
out = self.run_curl(address=address)
i = 0
ret_string = ""
while (i < 1000):
ret_string += str(i) + ","
i += 1
self.assertIn(ret_string, out)
# @SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
# def test_large_get_request(self):
# Tests a large get request. Response is more than 50k which means it
# will span many qd_http2_buffer_t objects.
# Run curl 127.0.0.1:port/largeget --http2-prior-knowledge
# address = self.router_qdra.http_addresses[0] + "/largeget"
# out = self.run_curl(address=address)
# self.assertIn("49996,49997,49998,49999", out)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_post_request(self):
# curl -d "fname=John&lname=Doe" -X POST 127.0.0.1:9000/myinfo --http2-prior-knowledge
address = self.router_qdra.http_addresses[0] + "/myinfo"
out = self.run_curl(args=['-d', 'fname=John&lname=Doe', '-X', 'POST'], address=address)
self.assertIn('Success! Your first name is John, last name is Doe', out)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_delete_request(self):
# curl -X DELETE "http://127.0.0.1:9000/myinfo/delete/22122" -H "accept: application/json" --http2-prior-knowledge
address = self.router_qdra.http_addresses[0] + "/myinfo/delete/22122"
out = self.run_curl(args=['-X', 'DELETE'], address=address)
self.assertIn('{"fname": "John", "lname": "Doe", "id": "22122"}', out)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_put_request(self):
# curl -d "fname=John&lname=Doe" -X PUT 127.0.0.1:9000/myinfo --http2-prior-knowledge
address = self.router_qdra.http_addresses[0] + "/myinfo"
out = self.run_curl(args=['-d', 'fname=John&lname=Doe', '-X', 'PUT'], address=address)
self.assertIn('Success! Your first name is John, last name is Doe', out)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_patch_request(self):
# curl -d "fname=John&lname=Doe" -X PATCH 127.0.0.1:9000/myinfo --http2-prior-knowledge
address = self.router_qdra.http_addresses[0] + "/patch"
out = self.run_curl(args=['--data', '{\"op\":\"add\",\"path\":\"/user\",\"value\":\"jane\"}', '-X', 'PATCH'], address=address)
self.assertIn('"op":"add"', out)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_404(self):
# Run curl 127.0.0.1:port/unavilable --http2-prior-knowledge
address = self.router_qdra.http_addresses[0] + "/unavilable"
out = self.run_curl(address=address)
self.assertIn('404 Not Found', out)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_500(self):
# Run curl 127.0.0.1:port/unavilable --http2-prior-knowledge
address = self.router_qdra.http_addresses[0] + "/test/500"
out = self.run_curl(address=address)
self.assertIn('500 Internal Server Error', out)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_get_image_png(self):
# Run curl 127.0.0.1:port --http2-prior-knowledge
passed = False
try:
address = self.router_qdra.http_addresses[0] + "/images/balanced-routing.png"
self.run_curl(address=address)
except UnicodeDecodeError as u:
if "codec can't decode byte 0x89" in str(u):
passed = True
self.assertTrue(passed)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_get_image_jpg(self):
# Run curl 127.0.0.1:port --http2-prior-knowledge
passed = False
try:
address = self.router_qdra.http_addresses[0] + "/images/apache.jpg"
self.run_curl(address=address)
except UnicodeDecodeError as u:
print(u)
if "codec can't decode byte 0xff" in str(u):
passed = True
self.assertTrue(passed)
def check_connector_delete(self, client_addr, server_addr):
# Run curl 127.0.0.1:port --http2-prior-knowledge
# We are first making sure that the http request goes thru successfully.
out = self.run_curl(address=client_addr)
# Run a qdmanage query on connections to see how many qdr_connections are
# there on the egress router
qd_manager = QdManager(self, address=server_addr)
connections = qd_manager.query('org.apache.qpid.dispatch.connection')
self.assertGreaterEqual(len(connections), 2)
server_conn_found = False
for conn in connections:
if os.environ['SERVER_LISTEN_PORT'] in conn['name']:
server_conn_found = True
break
self.assertTrue(server_conn_found)
# Run a qdmanage DELETE on the httpConnector
http_connectors = qd_manager.query('org.apache.qpid.dispatch.httpConnector')
self.assertEqual(len(http_connectors), 1)
# Delete the httpConnector
qd_manager.delete("org.apache.qpid.dispatch.httpConnector", name=self.connector_name)
# Make sure the connector is gone
http_connectors = qd_manager.query('org.apache.qpid.dispatch.httpConnector')
self.assertEqual(len(http_connectors), 0)
# Deleting the connector must have taken out the connection to the server.
connections = qd_manager.query('org.apache.qpid.dispatch.connection')
http_server_conn_found = False
for conn in connections:
if os.environ['SERVER_LISTEN_PORT'] in conn['name']:
server_conn_found = True
break
self.assertFalse(http_server_conn_found)
sleep(2)
# Now, run a curl client GET request with a timeout
request_timed_out = False
try:
out = self.run_curl(address=client_addr, timeout=5)
print(out)
except Exception as e:
request_timed_out = True
self.assertTrue(request_timed_out)
# Add back the httpConnector
# qdmanage CREATE type=httpConnector address=examples.com host=127.0.0.1 port=80 protocolVersion=HTTP2
create_result = qd_manager.create("org.apache.qpid.dispatch.httpConnector", self.connector_props)
num_tries = 2
tries = 0
conn_present = False
while tries < num_tries:
connections = qd_manager.query('org.apache.qpid.dispatch.connection')
tries += 1
if (len(connections) < 2):
sleep(2)
else:
conn_present = True
self.assertTrue(conn_present)
out = self.run_curl(address=client_addr)
ret_string = ""
i = 0
while (i < 1000):
ret_string += str(i) + ","
i += 1
self.assertIn(ret_string, out)
class Http2TestOneStandaloneRouter(Http2TestBase, CommonHttp2Tests):
@classmethod
def setUpClass(cls):
super(Http2TestOneStandaloneRouter, cls).setUpClass()
if skip_test():
return
cls.http2_server_name = "http2_server"
os.environ["QUART_APP"] = "http2server:app"
os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
py_string='python3',
server_file="http2_server.py")
name = "http2-test-standalone-router"
cls.connector_name = 'connectorToBeDeleted'
cls.connector_props = {
'port': os.getenv('SERVER_LISTEN_PORT'),
'address': 'examples',
'host': '127.0.0.1',
'protocolVersion': 'HTTP2',
'name': cls.connector_name
}
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
('httpConnector', cls.connector_props)
])
cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_zzz_http_connector_delete(self):
self.check_connector_delete(client_addr=self.router_qdra.http_addresses[0],
server_addr=self.router_qdra.addresses[0])
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_000_stats(self):
# Run curl 127.0.0.1:port --http2-prior-knowledge
address = self.router_qdra.http_addresses[0]
qd_manager = QdManager(self, address=self.router_qdra.addresses[0])
# First request
out = self.run_curl(address=address)
# Second request
address = self.router_qdra.http_addresses[0] + "/myinfo"
out = self.run_curl(args=['-d', 'fname=Mickey&lname=Mouse', '-X', 'POST'], address=address)
self.assertIn('Success! Your first name is Mickey, last name is Mouse', out)
stats = qd_manager.query('org.apache.qpid.dispatch.httpRequestInfo')
self.assertEqual(len(stats), 2)
# Give time for the core thread to augment the stats.
i = 0
while i < 3:
if not stats or stats[0].get('requests') < 2:
i += 1
sleep(1)
stats = qd_manager.query('org.apache.qpid.dispatch.httpRequestInfo')
else:
break
for s in stats:
self.assertEqual(s.get('requests'), 2)
self.assertEqual(s.get('details').get('GET:200'), 1)
self.assertEqual(s.get('details').get('POST:200'), 1)
if stats[0].get('direction') == 'out':
self.assertEqual(stats[1].get('direction'), 'in')
self.assertEqual(stats[0].get('bytesOut'), 24)
self.assertEqual(stats[0].get('bytesIn'), 3944)
self.assertEqual(stats[1].get('bytesOut'), 3944)
self.assertEqual(stats[1].get('bytesIn'), 24)
else:
self.assertEqual(stats[0].get('direction'), 'in')
self.assertEqual(stats[1].get('direction'), 'out')
self.assertEqual(stats[0].get('bytesOut'), 3944)
self.assertEqual(stats[0].get('bytesIn'), 24)
self.assertEqual(stats[1].get('bytesOut'), 24)
self.assertEqual(stats[1].get('bytesIn'), 3944)
class Http2TestOneEdgeRouter(Http2TestBase, CommonHttp2Tests):
@classmethod
def setUpClass(cls):
super(Http2TestOneEdgeRouter, cls).setUpClass()
if skip_test():
return
cls.http2_server_name = "http2_server"
os.environ["QUART_APP"] = "http2server:app"
os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
py_string='python3',
server_file="http2_server.py")
name = "http2-test-router"
cls.connector_name = 'connectorToBeDeleted'
cls.connector_props = {
'port': os.getenv('SERVER_LISTEN_PORT'),
'address': 'examples',
'host': '127.0.0.1',
'protocolVersion': 'HTTP2',
'name': cls.connector_name
}
config = Qdrouterd.Config([
('router', {'mode': 'edge', 'id': 'QDR'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
('httpConnector', cls.connector_props)
])
cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_zzz_http_connector_delete(self):
self.check_connector_delete(client_addr=self.router_qdra.http_addresses[0],
server_addr=self.router_qdra.addresses[0])
class Http2TestOneInteriorRouter(Http2TestBase, CommonHttp2Tests):
@classmethod
def setUpClass(cls):
super(Http2TestOneInteriorRouter, cls).setUpClass()
if skip_test():
return
cls.http2_server_name = "http2_server"
os.environ["QUART_APP"] = "http2server:app"
os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
py_string='python3',
server_file="http2_server.py")
name = "http2-test-router"
cls.connector_name = 'connectorToBeDeleted'
cls.connector_props = {
'port': os.getenv('SERVER_LISTEN_PORT'),
'address': 'examples',
'host': '127.0.0.1',
'protocolVersion': 'HTTP2',
'name': cls.connector_name
}
config = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
('httpConnector', cls.connector_props)
])
cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_zzz_http_connector_delete(self):
self.check_connector_delete(client_addr=self.router_qdra.http_addresses[0],
server_addr=self.router_qdra.addresses[0])
class Http2TestTwoRouter(Http2TestBase, CommonHttp2Tests):
@classmethod
def setUpClass(cls):
super(Http2TestTwoRouter, cls).setUpClass()
if skip_test():
return
cls.http2_server_name = "http2_server"
os.environ["QUART_APP"] = "http2server:app"
os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
py_string='python3',
server_file="http2_server.py")
name = "http2-test-router"
inter_router_port = cls.tester.get_port()
config_qdra = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
('listener', {'role': 'inter-router', 'port': inter_router_port})
])
cls.connector_name = 'connectorToBeDeleted'
cls.connector_props = {
'port': os.getenv('SERVER_LISTEN_PORT'),
'address': 'examples',
'host': '127.0.0.1',
'protocolVersion': 'HTTP2',
'name': cls.connector_name
}
config_qdrb = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.B'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('httpConnector', cls.connector_props),
('connector', {'name': 'connectorToA', 'role': 'inter-router',
'port': inter_router_port,
'verifyHostname': 'no'})
])
cls.router_qdra = cls.tester.qdrouterd(name, config_qdra, wait=True)
cls.router_qdrb = cls.tester.qdrouterd(name, config_qdrb, wait=True)
cls.router_qdra.wait_router_connected('QDR.B')
cls.router_qdrb.wait_router_connected('QDR.A')
sleep(2)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_000_stats(self):
# Run curl 127.0.0.1:port --http2-prior-knowledge
address = self.router_qdra.http_addresses[0]
qd_manager_a = QdManager(self, address=self.router_qdra.addresses[0])
stats_a = qd_manager_a.query('org.apache.qpid.dispatch.httpRequestInfo')
# First request
self.run_curl(address=address)
address = self.router_qdra.http_addresses[0] + "/myinfo"
# Second request
out = self.run_curl(args=['-d', 'fname=Mickey&lname=Mouse', '-X', 'POST'], address=address)
self.assertIn('Success! Your first name is Mickey, last name is Mouse', out)
# Give time for the core thread to augment the stats.
i = 0
while i < 3:
if not stats_a or stats_a[0].get('requests') < 2:
sleep(1)
i += 1
stats_a = qd_manager_a.query('org.apache.qpid.dispatch.httpRequestInfo')
else:
break
self.assertEqual(len(stats_a), 1)
self.assertEqual(stats_a[0].get('requests'), 2)
self.assertEqual(stats_a[0].get('direction'), 'in')
self.assertEqual(stats_a[0].get('bytesOut'), 3944)
self.assertEqual(stats_a[0].get('bytesIn'), 24)
qd_manager_b = QdManager(self, address=self.router_qdrb.addresses[0])
stats_b = qd_manager_b.query('org.apache.qpid.dispatch.httpRequestInfo')
self.assertEqual(len(stats_b), 1)
i = 0
while i < 3:
s = stats_b[0]
if not stats_b or stats_b[0].get('requests') < 2:
i += 1
sleep(1)
stats_b = qd_manager_b.query('org.apache.qpid.dispatch.httpRequestInfo')
else:
break
self.assertEqual(stats_b[0].get('requests'), 2)
self.assertEqual(stats_b[0].get('direction'), 'out')
self.assertEqual(stats_b[0].get('bytesOut'), 24)
self.assertEqual(stats_b[0].get('bytesIn'), 3944)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_zzz_http_connector_delete(self):
self.check_connector_delete(client_addr=self.router_qdra.http_addresses[0],
server_addr=self.router_qdrb.addresses[0])
class Http2TestEdgeInteriorRouter(Http2TestBase, CommonHttp2Tests):
"""
The interior router connects to the HTTP2 server and the curl client
connects to the edge router.
"""
@classmethod
def setUpClass(cls):
super(Http2TestEdgeInteriorRouter, cls).setUpClass()
if skip_test():
return
cls.http2_server_name = "http2_server"
os.environ["QUART_APP"] = "http2server:app"
os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
py_string='python3',
server_file="http2_server.py")
inter_router_port = cls.tester.get_port()
config_edgea = Qdrouterd.Config([
('router', {'mode': 'edge', 'id': 'EDGE.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
('connector', {'name': 'connectorToA', 'role': 'edge',
'port': inter_router_port,
'verifyHostname': 'no'})
])
config_qdrb = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('listener', {'role': 'edge', 'port': inter_router_port}),
('httpConnector',
{'port': os.getenv('SERVER_LISTEN_PORT'), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'})
])
cls.router_qdrb = cls.tester.qdrouterd("interior-router", config_qdrb, wait=True)
cls.router_qdra = cls.tester.qdrouterd("edge-router", config_edgea)
sleep(3)
class Http2TestInteriorEdgeRouter(Http2TestBase, CommonHttp2Tests):
"""
The edge router connects to the HTTP2 server and the curl client
connects to the interior router.
"""
@classmethod
def setUpClass(cls):
super(Http2TestInteriorEdgeRouter, cls).setUpClass()
if skip_test():
return
cls.http2_server_name = "http2_server"
os.environ["QUART_APP"] = "http2server:app"
os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
py_string='python3',
server_file="http2_server.py")
inter_router_port = cls.tester.get_port()
config_edge = Qdrouterd.Config([
('router', {'mode': 'edge', 'id': 'EDGE.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('httpConnector',
{'port': os.getenv('SERVER_LISTEN_PORT'), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
('connector', {'name': 'connectorToA', 'role': 'edge',
'port': inter_router_port,
'verifyHostname': 'no'})
])
config_qdra = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('listener', {'role': 'edge', 'port': inter_router_port}),
('httpListener',
{'port': cls.tester.get_port(), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
])
cls.router_qdra = cls.tester.qdrouterd("interior-router", config_qdra, wait=True)
cls.router_qdrb = cls.tester.qdrouterd("edge-router", config_edge)
sleep(3)
class Http2TestEdgeToEdgeViaInteriorRouter(Http2TestBase, CommonHttp2Tests):
"""
The edge router connects to the HTTP2 server and the curl client
connects to another edge router. The two edge routers are connected
via an interior router.
"""
@classmethod
def setUpClass(cls):
super(Http2TestEdgeToEdgeViaInteriorRouter, cls).setUpClass()
if skip_test():
return
cls.http2_server_name = "http2_server"
os.environ["QUART_APP"] = "http2server:app"
os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
py_string='python3',
server_file="http2_server.py")
cls.connector_name = 'connectorToBeDeleted'
cls.connector_props = {
'port': os.getenv('SERVER_LISTEN_PORT'),
'address': 'examples',
'host': '127.0.0.1',
'protocolVersion': 'HTTP2',
'name': cls.connector_name
}
inter_router_port = cls.tester.get_port()
config_edge_b = Qdrouterd.Config([
('router', {'mode': 'edge', 'id': 'EDGE.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('httpConnector', cls.connector_props),
('connector', {'name': 'connectorToA', 'role': 'edge',
'port': inter_router_port,
'verifyHostname': 'no'})
])
config_qdra = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('listener', {'role': 'edge', 'port': inter_router_port}),
])
config_edge_a = Qdrouterd.Config([
('router', {'mode': 'edge', 'id': 'EDGE.B'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal',
'host': '0.0.0.0'}),
('httpListener',
{'port': cls.tester.get_port(), 'address': 'examples',
'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
('connector', {'name': 'connectorToA', 'role': 'edge',
'port': inter_router_port,
'verifyHostname': 'no'})
])
cls.interior_qdr = cls.tester.qdrouterd("interior-router", config_qdra,
wait=True)
cls.router_qdra = cls.tester.qdrouterd("edge-router-a", config_edge_a)
cls.router_qdrb = cls.tester.qdrouterd("edge-router-b", config_edge_b)
sleep(5)
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
def test_zzz_http_connector_delete(self):
self.check_connector_delete(client_addr=self.router_qdra.http_addresses[0],
server_addr=self.router_qdrb.addresses[0])