# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import os
import threading
import sys
import ssl
from urllib2 import urlopen, build_opener, HTTPSHandler
from urllib2 import HTTPError, URLError
import urllib2 as urllib
except ImportError:
# python3
from urllib.request import urlopen, build_opener, HTTPSHandler
from urllib.error import HTTPError, URLError
import urllib
from system_test import TIMEOUT, Process
from subprocess import PIPE, STDOUT
from system_test import TestCase, Qdrouterd, main_module, DIR
from system_test import unittest
class RouterTestHttp(TestCase):
def ssl_file(name):
return os.path.join(DIR, 'ssl_certs', name)
def get(cls, url):
if sys.version_info >= (2, 7):
http_data = urlopen(url, cafile=cls.ssl_file('ca-certificate.pem'))
req = urllib.Request(url=url)
http_data = urlopen(req)
def get_cert(cls, url):
context = ssl.create_default_context()
opener = build_opener(HTTPSHandler(context=context))
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
out = p.communicate(input)[0]
except Exception as e:
raise Exception(out if out else str(e))
return out
def assert_get(self, url):
self.assertEqual(u'HTTP test\n', self.get("%s/system_tests_http.txt" % url))
def assert_get_cert(self, url):
self.assertEqual(u'HTTP test\n', self.get_cert("%s/system_tests_http.txt" % url))
def test_listen_error(self):
"""Make sure a router exits if an initial HTTP listener fails, doesn't hang"""
listen_port = self.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'bad'}),
('listener', {'port': listen_port, 'maxFrameSize': '2048', 'stripAnnotations': 'no'}),
('listener', {'port': listen_port, 'http':True})])
r = Qdrouterd(name="expect_fail", config=config, wait=False)
self.assertEqual(1, r.wait())
def test_http_listener_delete(self):
name = 'delete_listener'
normal_listen_port = self.get_port()
http_delete_listen_port = self.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('listener', {'port': normal_listen_port, 'maxFrameSize': '2048', 'stripAnnotations': 'no'}),
('listener', {'name': name, 'port': http_delete_listen_port, 'http': True})])
router = self.qdrouterd(name="expect_fail_1", config=config, wait=True)
exception_occurred = False
def address():
return router.addresses[0]
long_type = 'org.apache.qpid.dispatch.listener'
delete_command = 'DELETE --type=' + long_type + ' --name=' + name
out = self.run_qdmanage(delete_command, address=address())
except Exception as e:
exception_occurred = True
self.assertTrue("BadRequestStatus: HTTP listeners cannot be deleted" in str(e))
def test_http_get(self):
config = Qdrouterd.Config([
('router', {'id': 'QDR.HTTP'}),
# httpRoot has been deprecated. We are using it here to test backward compatibility.
('listener', {'port': self.get_port(), 'httpRoot': os.path.dirname(__file__)}),
('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
r = self.qdrouterd('http-test-router', config)
def test(port):
self.assert_get("http://localhost:%d" % port)
self.assertRaises(HTTPError, urlopen, "http://localhost:%d/nosuch" % port)
# Sequential calls on multiple ports
for port in r.ports: test(port)
# Concurrent calls on multiple ports
class TestThread(threading.Thread):
def __init__(self, port):
self.port, self.ex = port, None
def run(self):
try: test(self.port)
except Exception as e: self.ex = e
threads = [TestThread(p) for p in r.ports + r.ports]
for t in threads: t.join()
for t in threads:
if t.ex: raise t.ex
# https not configured
self.assertRaises(URLError, urlopen, "https://localhost:%d/nosuch" % r.ports[0])
def test_http_metrics(self):
if not sys.version_info >= (2, 7):
config = Qdrouterd.Config([
('router', {'id': 'QDR.METRICS'}),
('listener', {'port': self.get_port(), 'http': 'yes'}),
('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
r = self.qdrouterd('metrics-test-router', config)
def test(port):
result = urlopen("http://localhost:%d/metrics" % port, cafile=self.ssl_file('ca-certificate.pem'))
self.assertEqual(200, result.getcode())
data ='utf-8')
assert('connections' in data)
assert('deliveries_ingress' in data)
assert('deliveries_delayed_1sec' in data)
assert('deliveries_delayed_10sec' in data)
assert('deliveries_redirected_to_fallback' in data)
# Sequential calls on multiple ports
for port in r.ports: test(port)
# Concurrent calls on multiple ports
class TestThread(threading.Thread):
def __init__(self, port):
self.port, self.ex = port, None
def run(self):
try: test(self.port)
except Exception as e: self.ex = e
threads = [TestThread(p) for p in r.ports + r.ports]
for t in threads: t.join()
for t in threads:
if t.ex: raise t.ex
def test_http_healthz(self):
if not sys.version_info >= (2, 7):
config = Qdrouterd.Config([
('router', {'id': 'QDR.HEALTHZ'}),
('listener', {'port': self.get_port(), 'http': 'yes'}),
('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
r = self.qdrouterd('metrics-test-router', config)
def test(port):
result = urlopen("http://localhost:%d/healthz" % port, cafile=self.ssl_file('ca-certificate.pem'))
self.assertEqual(200, result.getcode())
# Sequential calls on multiple ports
for port in r.ports: test(port)
# Concurrent calls on multiple ports
class TestThread(threading.Thread):
def __init__(self, port):
self.port, self.ex = port, None
def run(self):
try: test(self.port)
except Exception as e: self.ex = e
threads = [TestThread(p) for p in r.ports + r.ports]
for t in threads: t.join()
for t in threads:
if t.ex: raise t.ex
def test_https_get(self):
def listener(**kwargs):
args = dict(kwargs)
args.update({'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)})
return ('listener', args)
config = Qdrouterd.Config([
('router', {'id': 'QDR.HTTPS'}),
('sslProfile', {'name': 'simple-ssl',
'caCertFile': self.ssl_file('ca-certificate.pem'),
'certFile': self.ssl_file('server-certificate.pem'),
'privateKeyFile': self.ssl_file('server-private-key.pem'),
'password': 'server-password'
listener(sslProfile='simple-ssl', requireSsl=False, authenticatePeer=False),
listener(sslProfile='simple-ssl', requireSsl=True, authenticatePeer=False),
listener(sslProfile='simple-ssl', requireSsl=True, authenticatePeer=True)])
# saslMechanisms='EXTERNAL'
r = self.qdrouterd('https-test-router', config)
self.assert_get("https://localhost:%s" % r.ports[0])
# requireSsl=false Allows simple-ssl HTTP
# The following test will be commented out if you are running a version lower than
# libwebsockets 3.2.0
${TEST_OPTION_ALLOW_HTTP_ON_HTTPS_LISTENER}self.assert_get("http://localhost:%s" % r.ports[0])
self.assert_get("https://localhost:%s" % r.ports[1])
# requireSsl=True does not allow simple-ssl HTTP
self.assertRaises(Exception, self.assert_get, "http://localhost:%s" % r.ports[1])
# authenticatePeer=True requires a client cert
self.assertRaises((URLError, ssl.SSLError), self.assert_get, "https://localhost:%s" % r.ports[2])
# could not provide key password on python 2.6
if sys.version_info >= (2, 7):
# Provide client cert
self.assert_get_cert("https://localhost:%d" % r.ports[2])
if __name__ == '__main__':