blob: 39935f7924f1c93fd01f58944780d10989476be5 [file] [log] [blame]
from __future__ import absolute_import
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import time
import sys
from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
from proton.reactor import Container, Reactor, ApplicationEvent, EventInjector
from proton.handlers import CHandshaker, MessagingHandler
from proton import Handler, Url
class Barf(Exception):
pass
class BarfOnInit:
def on_reactor_init(self, event):
raise Barf()
def on_connection_init(self, event):
raise Barf()
def on_session_init(self, event):
raise Barf()
def on_link_init(self, event):
raise Barf()
class BarfOnTask:
def on_timer_task(self, event):
raise Barf()
class BarfOnFinal:
init = False
def on_reactor_init(self, event):
self.init = True
def on_reactor_final(self, event):
raise Barf()
class BarfOnFinalDerived(CHandshaker):
init = False
def on_reactor_init(self, event):
self.init = True
def on_reactor_final(self, event):
raise Barf()
class ExceptionTest(Test):
def setUp(self):
self.reactor = Reactor()
def test_reactor_final(self):
self.reactor.global_handler = BarfOnFinal()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_global_set(self):
self.reactor.global_handler = BarfOnInit()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_global_add(self):
self.reactor.global_handler.add(BarfOnInit())
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_reactor_set(self):
self.reactor.handler = BarfOnInit()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_reactor_add(self):
self.reactor.handler.add(BarfOnInit())
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_connection(self):
self.reactor.connection(BarfOnInit())
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_connection_set(self):
c = self.reactor.connection()
c.handler = BarfOnInit()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_connection_add(self):
c = self.reactor.connection()
c.handler = object()
c.handler.add(BarfOnInit())
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_session_set(self):
c = self.reactor.connection()
s = c.session()
s.handler = BarfOnInit()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_session_add(self):
c = self.reactor.connection()
s = c.session()
s.handler = object()
s.handler.add(BarfOnInit())
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_link_set(self):
c = self.reactor.connection()
s = c.session()
l = s.sender("xxx")
l.handler = BarfOnInit()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_link_add(self):
c = self.reactor.connection()
s = c.session()
l = s.sender("xxx")
l.handler = object()
l.handler.add(BarfOnInit())
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_schedule(self):
self.reactor.schedule(0, BarfOnTask())
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
def test_schedule_many_nothings(self):
class Nothing:
results = []
def on_timer_task(self, event):
self.results.append(None)
num = 12345
for a in range(num):
self.reactor.schedule(0, Nothing())
self.reactor.run()
assert len(Nothing.results) == num
def test_schedule_many_nothing_refs(self):
class Nothing:
results = []
def on_timer_task(self, event):
self.results.append(None)
num = 12345
tasks = []
for a in range(num):
tasks.append(self.reactor.schedule(0, Nothing()))
self.reactor.run()
assert len(Nothing.results) == num
def test_schedule_many_nothing_refs_cancel_before_run(self):
class Nothing:
results = []
def on_timer_task(self, event):
self.results.append(None)
num = 12345
tasks = []
for a in range(num):
tasks.append(self.reactor.schedule(0, Nothing()))
for task in tasks:
task.cancel()
self.reactor.run()
assert len(Nothing.results) == 0
def test_schedule_cancel(self):
barf = self.reactor.schedule(10, BarfOnTask())
class CancelBarf:
def __init__(self, barf):
self.barf = barf
def on_timer_task(self, event):
self.barf.cancel()
pass
self.reactor.schedule(0, CancelBarf(barf))
now = self.reactor.mark()
try:
self.reactor.run()
elapsed = self.reactor.mark() - now
assert elapsed < 10, "expected cancelled task to not delay the reactor by %s" % elapsed
except Barf:
assert False, "expected barf to be cancelled"
def test_schedule_cancel_many(self):
num = 12345
barfs = set()
for a in range(num):
barf = self.reactor.schedule(10*(a+1), BarfOnTask())
class CancelBarf:
def __init__(self, barf):
self.barf = barf
def on_timer_task(self, event):
self.barf.cancel()
barfs.discard(self.barf)
pass
self.reactor.schedule(0, CancelBarf(barf))
barfs.add(barf)
now = self.reactor.mark()
try:
self.reactor.run()
elapsed = self.reactor.mark() - now
assert elapsed < num, "expected cancelled task to not delay the reactor by %s" % elapsed
assert not barfs, "expected all barfs to be discarded"
except Barf:
assert False, "expected barf to be cancelled"
class HandlerDerivationTest(Test):
def setUp(self):
import platform
if platform.python_implementation() != "Jython":
# Exception propagation does not work currently for CPython
raise SkipTest()
self.reactor = Reactor()
def wrong_exception(self):
import sys
ex = sys.exc_info()
assert False, " Unexpected exception " + str(ex[1])
def test_reactor_final_derived(self):
h = BarfOnFinalDerived()
self.reactor.global_handler = h
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
except:
self.wrong_exception()
def test_reactor_final_py_child_py(self):
class APoorExcuseForAHandler:
def __init__(self):
self.handlers = [BarfOnFinal()]
self.reactor.global_handler = APoorExcuseForAHandler()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
except:
self.wrong_exception()
def test_reactor_final_py_child_derived(self):
class APoorExcuseForAHandler:
def __init__(self):
self.handlers = [BarfOnFinalDerived()]
self.reactor.global_handler = APoorExcuseForAHandler()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
except:
self.wrong_exception()
def test_reactor_final_derived_child_derived(self):
class APoorExcuseForAHandler(CHandshaker):
def __init__(self):
CHandshaker.__init__(self)
self.handlers = [BarfOnFinalDerived()]
self.reactor.global_handler = APoorExcuseForAHandler()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
except:
self.wrong_exception()
def test_reactor_final_derived_child_py(self):
class APoorExcuseForAHandler(CHandshaker):
def __init__(self):
CHandshaker.__init__(self)
self.handlers = [BarfOnFinal()]
self.reactor.global_handler = APoorExcuseForAHandler()
try:
self.reactor.run()
assert False, "expected to barf"
except Barf:
pass
except:
self.wrong_exception()
def test_reactor_init_derived(self):
h = BarfOnFinalDerived()
self.reactor.global_handler = h
try:
self.reactor.run()
assert False, "expected to barf"
except:
assert h.init, "excpected the init"
def test_reactor_init_py_child_py(self):
h = BarfOnFinal()
class APoorExcuseForAHandler:
def __init__(self):
self.handlers = [h]
self.reactor.global_handler = APoorExcuseForAHandler()
try:
self.reactor.run()
assert False, "expected to barf"
except:
assert h.init, "excpected the init"
def test_reactor_init_py_child_derived(self):
h = BarfOnFinalDerived()
class APoorExcuseForAHandler:
def __init__(self):
self.handlers = [h]
self.reactor.global_handler = APoorExcuseForAHandler()
try:
self.reactor.run()
assert False, "expected to barf"
except:
assert h.init, "excpected the init"
def test_reactor_init_derived_child_derived(self):
h = BarfOnFinalDerived()
class APoorExcuseForAHandler(CHandshaker):
def __init__(self):
CHandshaker.__init__(self)
self.handlers = [h]
self.reactor.global_handler = APoorExcuseForAHandler()
try:
self.reactor.run()
assert False, "expected to barf"
except:
assert h.init, "excpected the init"
def test_reactor_init_derived_child_py(self):
h = BarfOnFinal()
class APoorExcuseForAHandler(CHandshaker):
def __init__(self):
CHandshaker.__init__(self)
self.handlers = [h]
self.reactor.global_handler = APoorExcuseForAHandler()
try:
self.reactor.run()
assert False, "expected to barf"
except:
assert h.init, "excpected the init"
class ApplicationEventTest(Test):
"""Test application defined events and handlers."""
class MyTestServer(TestServer):
def __init__(self):
super(ApplicationEventTest.MyTestServer, self).__init__()
class MyHandler(Handler):
def __init__(self, test):
super(ApplicationEventTest.MyHandler, self).__init__()
self._test = test
def on_hello(self, event):
# verify PROTON-1056
self._test.hello_rcvd = str(event)
def on_goodbye(self, event):
self._test.goodbye_rcvd = str(event)
def setUp(self):
import os
if not hasattr(os, 'pipe'):
# KAG: seems like Jython doesn't have an os.pipe() method
raise SkipTest()
if os.name=="nt":
# Correct implementation on Windows is complicated
raise SkipTest("PROTON-1071")
self.server = ApplicationEventTest.MyTestServer()
self.server.reactor.handler.add(ApplicationEventTest.MyHandler(self))
self.event_injector = EventInjector()
self.hello_event = ApplicationEvent("hello")
self.goodbye_event = ApplicationEvent("goodbye")
self.server.reactor.selectable(self.event_injector)
self.hello_rcvd = None
self.goodbye_rcvd = None
self.server.start()
def tearDown(self):
self.server.stop()
def _wait_for(self, predicate, timeout=10.0):
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
break
time.sleep(0.1)
assert predicate()
def test_application_events(self):
self.event_injector.trigger(self.hello_event)
self._wait_for(lambda: self.hello_rcvd is not None)
self.event_injector.trigger(self.goodbye_event)
self._wait_for(lambda: self.goodbye_rcvd is not None)
class AuthenticationTestHandler(MessagingHandler):
def __init__(self):
super(AuthenticationTestHandler, self).__init__()
port = free_tcp_port()
self.url = "localhost:%i" % port
self.verified = False
def on_start(self, event):
self.listener = event.container.listen(self.url)
def on_connection_opened(self, event):
event.connection.close()
def on_connection_opening(self, event):
assert event.connection.transport.user == "user@proton"
self.verified = True
def on_connection_closed(self, event):
event.connection.close()
self.listener.close()
def on_connection_error(self, event):
event.connection.close()
self.listener.close()