| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import absolute_import |
| |
| import time |
| |
| from proton.reactor import Container, ApplicationEvent, EventInjector, Selector, Backoff |
| from proton.handlers import Handshaker, MessagingHandler |
| from proton import Handler, Url, symbol |
| |
| from .common import Test, SkipTest, TestServer, free_tcp_port, free_tcp_ports, ensureCanTestExtendedSASL |
| |
| |
| class Barf(Exception): |
| pass |
| |
| |
| class BarfOnInit: |
| |
| def on_reactor_init(self, event): |
| raise Barf() |
| |
| def on_connection_init(self, event): |
| raise Barf() |
| |
| def on_session_init(self, event): |
| raise Barf() |
| |
| def on_link_init(self, event): |
| raise Barf() |
| |
| |
| class BarfOnTask: |
| |
| def on_timer_task(self, event): |
| raise Barf() |
| |
| |
| class BarfOnFinal: |
| init = False |
| |
| def on_reactor_init(self, event): |
| self.init = True |
| |
| def on_reactor_final(self, event): |
| raise Barf() |
| |
| |
| class BarfOnFinalDerived(Handshaker): |
| init = False |
| |
| def on_reactor_init(self, event): |
| self.init = True |
| |
| def on_reactor_final(self, event): |
| raise Barf() |
| |
| |
| class ExceptionTest(Test): |
| |
| def setUp(self): |
| self.container = Container() |
| |
| def test_reactor_final(self): |
| self.container.global_handler = BarfOnFinal() |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_global_set(self): |
| self.container.global_handler = BarfOnInit() |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_global_add(self): |
| self.container.global_handler.add(BarfOnInit()) |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_reactor_set(self): |
| self.container.handler = BarfOnInit() |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_reactor_add(self): |
| self.container.handler.add(BarfOnInit()) |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_connection(self): |
| self.container.connection(BarfOnInit()) |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_connection_set(self): |
| c = self.container.connection() |
| c.handler = BarfOnInit() |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_connection_add(self): |
| c = self.container.connection() |
| c.handler = object() |
| c.handler.add(BarfOnInit()) |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_session_set(self): |
| c = self.container.connection() |
| s = c.session() |
| s.handler = BarfOnInit() |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_session_add(self): |
| c = self.container.connection() |
| s = c.session() |
| s.handler = object() |
| s.handler.add(BarfOnInit()) |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_link_set(self): |
| c = self.container.connection() |
| s = c.session() |
| l = s.sender("xxx") |
| l.handler = BarfOnInit() |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_link_add(self): |
| c = self.container.connection() |
| s = c.session() |
| l = s.sender("xxx") |
| l.handler = object() |
| l.handler.add(BarfOnInit()) |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_schedule(self): |
| self.container.schedule(0, BarfOnTask()) |
| try: |
| self.container.run() |
| assert False, "expected to barf" |
| except Barf: |
| pass |
| |
| def test_schedule_event(self): |
| class Nothing: |
| def __init__(self, p): |
| self.parent = p |
| |
| results = [] |
| |
| def on_timer_task(self, event): |
| self.parent.triggered = True |
| assert event.context == self.parent.task |
| assert event.container == self.parent.container |
| self.task = self.container.schedule(0, Nothing(self)) |
| self.container.run() |
| assert self.triggered |
| |
| def test_schedule_many_nothings(self): |
| class Nothing: |
| results = [] |
| |
| def on_timer_task(self, event): |
| self.results.append(None) |
| num = 12345 |
| for a in range(num): |
| self.container.schedule(0, Nothing()) |
| self.container.run() |
| assert len(Nothing.results) == num |
| |
| def test_schedule_many_nothing_refs(self): |
| class Nothing: |
| results = [] |
| |
| def on_timer_task(self, event): |
| self.results.append(None) |
| num = 12345 |
| tasks = [] |
| for a in range(num): |
| tasks.append(self.container.schedule(0, Nothing())) |
| self.container.run() |
| assert len(Nothing.results) == num |
| |
| def test_schedule_many_nothing_refs_cancel_before_run(self): |
| class Nothing: |
| results = [] |
| |
| def on_timer_task(self, event): |
| self.results.append(None) |
| num = 12345 |
| tasks = [] |
| for a in range(num): |
| tasks.append(self.container.schedule(0, Nothing())) |
| for task in tasks: |
| task.cancel() |
| self.container.run() |
| assert len(Nothing.results) == 0 |
| |
| def test_schedule_cancel(self): |
| barf = self.container.schedule(10, BarfOnTask()) |
| |
| class CancelBarf: |
| def __init__(self, barf): |
| self.barf = barf |
| |
| def on_timer_task(self, event): |
| self.barf.cancel() |
| pass |
| self.container.schedule(0, CancelBarf(barf)) |
| start = time.time() |
| try: |
| self.container.run() |
| elapsed = time.time() - start |
| assert elapsed < 10, "should have cancelled immediately, took %ss" % elapsed |
| except Barf: |
| assert False, "expected barf to be cancelled" |
| |
| def test_schedule_cancel_many(self): |
| num = 12345 |
| barfs = set() |
| for a in range(num): |
| barf = self.container.schedule(10 * (a + 1), BarfOnTask()) |
| |
| class CancelBarf: |
| def __init__(self, barf): |
| self.barf = barf |
| |
| def on_timer_task(self, event): |
| self.barf.cancel() |
| barfs.discard(self.barf) |
| pass |
| self.container.schedule(0, CancelBarf(barf)) |
| barfs.add(barf) |
| start = time.time() |
| try: |
| self.container.run() |
| elapsed = time.time() - start |
| assert elapsed < num, "expected cancelled task to not delay the reactor by %s" % elapsed |
| assert not barfs, "expected all barfs to be discarded" |
| except Barf: |
| assert False, "expected barf to be cancelled" |
| |
| |
| class ApplicationEventTest(Test): |
| """Test application defined events and handlers.""" |
| |
| class MyTestServer(TestServer): |
| def __init__(self): |
| super(ApplicationEventTest.MyTestServer, self).__init__() |
| |
| class MyHandler(Handler): |
| def __init__(self, test): |
| super(ApplicationEventTest.MyHandler, self).__init__() |
| self._test = test |
| |
| def on_hello(self, event): |
| # verify PROTON-1056 |
| self._test.hello_rcvd = str(event) |
| |
| def on_goodbye(self, event): |
| self._test.goodbye_rcvd = str(event) |
| |
| def setUp(self): |
| import os |
| if not hasattr(os, 'pipe'): |
| # KAG: seems like Jython doesn't have an os.pipe() method |
| raise SkipTest() |
| if os.name == "nt": |
| # Correct implementation on Windows is complicated |
| raise SkipTest("PROTON-1071") |
| self.server = ApplicationEventTest.MyTestServer() |
| self.server.reactor.handler.add(ApplicationEventTest.MyHandler(self)) |
| self.event_injector = EventInjector() |
| self.hello_event = ApplicationEvent("hello") |
| self.goodbye_event = ApplicationEvent("goodbye") |
| self.server.reactor.selectable(self.event_injector) |
| self.hello_rcvd = None |
| self.goodbye_rcvd = None |
| self.server.start() |
| |
| def tearDown(self): |
| self.server.stop() |
| |
| def _wait_for(self, predicate, timeout=10.0): |
| deadline = time.time() + timeout |
| while time.time() < deadline: |
| if predicate(): |
| break |
| time.sleep(0.1) |
| assert predicate() |
| |
| def test_application_events(self): |
| self.event_injector.trigger(self.hello_event) |
| self._wait_for(lambda: self.hello_rcvd is not None) |
| self.event_injector.trigger(self.goodbye_event) |
| self._wait_for(lambda: self.goodbye_rcvd is not None) |
| |
| |
| class AuthenticationTestHandler(MessagingHandler): |
| def __init__(self): |
| super(AuthenticationTestHandler, self).__init__() |
| port = free_tcp_port() |
| self.url = "localhost:%i" % port |
| self.verified = False |
| |
| def on_start(self, event): |
| self.listener = event.container.listen(self.url) |
| |
| def on_connection_opened(self, event): |
| event.connection.close() |
| |
| def on_connection_opening(self, event): |
| assert event.connection.transport.user == "user@proton" |
| self.verified = True |
| |
| def on_connection_closed(self, event): |
| event.connection.close() |
| self.listener.close() |
| |
| def on_connection_error(self, event): |
| event.connection.close() |
| self.listener.close() |
| |
| |
| class ContainerTest(Test): |
| """Test container subclass of reactor.""" |
| |
| def test_event_has_container_attribute(self): |
| ensureCanTestExtendedSASL() |
| |
| class TestHandler(MessagingHandler): |
| def __init__(self): |
| super(TestHandler, self).__init__() |
| port = free_tcp_port() |
| self.url = "localhost:%i" % port |
| |
| def on_start(self, event): |
| self.listener = event.container.listen(self.url) |
| |
| def on_connection_closing(self, event): |
| event.connection.close() |
| self.listener.close() |
| test_handler = TestHandler() |
| container = Container(test_handler) |
| |
| class ConnectionHandler(MessagingHandler): |
| def __init__(self): |
| super(ConnectionHandler, self).__init__() |
| |
| def on_connection_opened(self, event): |
| event.connection.close() |
| assert event.container is event.reactor |
| assert event.container is container |
| container.connect(test_handler.url, handler=ConnectionHandler()) |
| container.run() |
| |
| def test_authentication_via_url(self): |
| ensureCanTestExtendedSASL() |
| test_handler = AuthenticationTestHandler() |
| container = Container(test_handler) |
| container.connect("%s:password@%s" % ("user%40proton", test_handler.url), reconnect=False) |
| container.run() |
| assert test_handler.verified |
| |
| def test_authentication_via_container_attributes(self): |
| ensureCanTestExtendedSASL() |
| test_handler = AuthenticationTestHandler() |
| container = Container(test_handler) |
| container.user = "user@proton" |
| container.password = "password" |
| container.connect(test_handler.url, reconnect=False) |
| container.run() |
| assert test_handler.verified |
| |
| def test_authentication_via_kwargs(self): |
| ensureCanTestExtendedSASL() |
| test_handler = AuthenticationTestHandler() |
| container = Container(test_handler) |
| container.connect(test_handler.url, user="user@proton", password="password", reconnect=False) |
| container.run() |
| assert test_handler.verified |
| |
| class _ServerHandler(MessagingHandler): |
| def __init__(self, host): |
| super(ContainerTest._ServerHandler, self).__init__() |
| self.host = host |
| self.port = free_tcp_port() |
| self.client_addr = None |
| self.peer_hostname = None |
| |
| def on_start(self, event): |
| self.listener = event.container.listen("%s:%s" % (self.host, self.port)) |
| |
| def on_connection_opened(self, event): |
| self.client_addr = event.connected_address |
| self.peer_hostname = event.connection.remote_hostname |
| |
| def on_connection_closing(self, event): |
| event.connection.close() |
| self.listener.close() |
| |
| class _ClientHandler(MessagingHandler): |
| def __init__(self): |
| super(ContainerTest._ClientHandler, self).__init__() |
| self.server_addr = None |
| self.errors = 0 |
| |
| def on_connection_opened(self, event): |
| self.server_addr = event.connected_address |
| event.connection.close() |
| |
| def on_transport_error(self, event): |
| self.errors += 1 |
| |
| def test_numeric_hostname(self): |
| ensureCanTestExtendedSASL() |
| server_handler = ContainerTest._ServerHandler("127.0.0.1") |
| client_handler = ContainerTest._ClientHandler() |
| container = Container(server_handler) |
| container.connect(url="127.0.0.1:%s" % (server_handler.port), |
| handler=client_handler) |
| container.run() |
| assert server_handler.client_addr |
| assert client_handler.server_addr |
| assert server_handler.peer_hostname == "127.0.0.1", server_handler.peer_hostname |
| assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port) |
| |
| def test_non_numeric_hostname(self): |
| ensureCanTestExtendedSASL() |
| server_handler = ContainerTest._ServerHandler("localhost") |
| client_handler = ContainerTest._ClientHandler() |
| container = Container(server_handler) |
| container.connect(url="localhost:%s" % (server_handler.port), |
| handler=client_handler) |
| container.run() |
| assert server_handler.client_addr |
| assert client_handler.server_addr |
| assert server_handler.peer_hostname == "localhost", server_handler.peer_hostname |
| assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port) |
| |
| def test_virtual_host(self): |
| ensureCanTestExtendedSASL() |
| server_handler = ContainerTest._ServerHandler("localhost") |
| container = Container(server_handler) |
| conn = container.connect(url="localhost:%s" % (server_handler.port), |
| handler=ContainerTest._ClientHandler(), |
| virtual_host="a.b.c.org") |
| container.run() |
| assert server_handler.peer_hostname == "a.b.c.org", server_handler.peer_hostname |
| |
| def test_no_virtual_host(self): |
| # explicitly setting an empty virtual host should prevent the hostname |
| # field from being sent in the Open performative when using the |
| # Python Container. |
| server_handler = ContainerTest._ServerHandler("localhost") |
| container = Container(server_handler) |
| conn = container.connect(url="localhost:%s" % (server_handler.port), |
| handler=ContainerTest._ClientHandler(), |
| virtual_host="") |
| container.run() |
| assert server_handler.peer_hostname is None, server_handler.peer_hostname |
| |
| class _ReconnectServerHandler(MessagingHandler): |
| def __init__(self, host, listen_on_error=True): |
| super(ContainerTest._ReconnectServerHandler, self).__init__() |
| self.host = host |
| self.port = free_tcp_port() |
| self.client_addr = None |
| self.peer_hostname = None |
| self.listen_on_error = listen_on_error |
| |
| def on_connection_opened(self, event): |
| self.client_addr = event.connected_address |
| self.peer_hostname = event.connection.remote_hostname |
| self.listener.close() |
| |
| def on_connection_closing(self, event): |
| event.connection.close() |
| |
| def listen(self, container): |
| if self.listen_on_error: |
| self.listener = container.listen("%s:%s" % (self.host, self.port)) |
| |
| class _ReconnectClientHandler(MessagingHandler): |
| def __init__(self, server_handler): |
| super(ContainerTest._ReconnectClientHandler, self).__init__() |
| self.connect_failed = False |
| self.server_addr = None |
| self.server_handler = server_handler |
| |
| def on_connection_opened(self, event): |
| self.server_addr = event.connected_address |
| event.connection.close() |
| |
| def on_transport_error(self, event): |
| assert self.connect_failed == False |
| self.connect_failed = True |
| self.server_handler.listen(event.container) |
| |
| def test_failover(self): |
| server_handler = ContainerTest._ServerHandler("localhost") |
| client_handler = ContainerTest._ClientHandler() |
| free_ports = free_tcp_ports(2) |
| container = Container(server_handler) |
| container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1]), |
| "localhost:%s" % (server_handler.port)], |
| handler=client_handler) |
| container.run() |
| assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname |
| assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr |
| |
| def test_failover_fail(self): |
| client_handler = ContainerTest._ClientHandler() |
| free_ports = free_tcp_ports(2) |
| container = Container(client_handler) |
| start = time.time() |
| container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])], |
| reconnect=Backoff(max_tries=5), |
| handler=client_handler) |
| container.run() |
| end = time.time() |
| assert client_handler.errors == 10 |
| # Total time for failure should be greater than but close to 3s |
| # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time! |
| assert 3.0 < end - start, end - start |
| assert client_handler.server_addr is None, client_handler.server_addr |
| |
| def test_failover_fail_custom_reconnect(self): |
| client_handler = ContainerTest._ClientHandler() |
| free_ports = free_tcp_ports(2) |
| container = Container(client_handler) |
| start = time.time() |
| container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])], |
| reconnect=[0, 0.5, 1], |
| handler=client_handler) |
| container.run() |
| end = time.time() |
| assert client_handler.errors == 6 |
| # Total time for failure should be greater than but close to 3s |
| # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time! |
| assert 3.0 < end - start, end - start |
| assert client_handler.server_addr is None, client_handler.server_addr |
| |
| def test_reconnect(self): |
| server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=True) |
| client_handler = ContainerTest._ReconnectClientHandler(server_handler) |
| container = Container(server_handler) |
| container.connect(url="localhost:%s" % (server_handler.port), |
| handler=client_handler) |
| container.run() |
| assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname |
| assert client_handler.connect_failed |
| assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr |
| |
| def test_not_reconnecting(self): |
| server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=False) |
| client_handler = ContainerTest._ReconnectClientHandler(server_handler) |
| container = Container(server_handler) |
| container.connect(url="localhost:%s" % (server_handler.port), |
| handler=client_handler, reconnect=False) |
| container.run() |
| assert server_handler.peer_hostname is None, server_handler.peer_hostname |
| assert client_handler.connect_failed |
| assert client_handler.server_addr is None, client_handler.server_addr |
| |
| |
| class SelectorTest(Test): |
| """Test the Selector""" |
| |
| def test_unicode_selector(self): |
| assert Selector(u"Hello").filter_set[symbol('selector')].value == u"Hello" |
| |
| def test_non_unicode_selector(self): |
| assert Selector(b"Hello").filter_set[symbol('selector')].value == u"Hello" |