blob: e8fdebdd1422ad676c449185aca91ab49efa4725 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import os
import gc
import traceback
from proton import *
from proton.reactor import Container
from . import common
CUSTOM = EventType("custom")
class HandlerTest(common.Test):
def test_reactorHandlerCycling(self, n=0):
class CustomHandler(Handler):
UNSET = 999999999
def __init__(self):
self.offset = len(traceback.extract_stack())
def on_reactor_init(self, event):
self.depth = len(traceback.extract_stack())
def reset(self):
self.depth = self.UNSET
def init_depth(self):
d = self.depth - self.offset
return d
custom = CustomHandler()
container = Container()
container.handler = custom
for i in range(n):
h = container.handler
container.handler = h
assert custom.init_depth < 50, "Unexpectedly long traceback for a simple handler"
def test_reactorHandlerCycling10k(self):
def test_reactorHandlerCycling100(self):
def do_customEvent(self, reactor_handler, event_root):
class CustomHandler:
did_custom = False
did_init = False
def __init__(self, *handlers):
self.handlers = handlers
def on_reactor_init(self, event):
self.did_init = True
def on_custom(self, event):
self.did_custom = True
class CustomInvoker(CustomHandler):
def on_reactor_init(self, event):
h = event_root(event)
event.dispatch(h, CUSTOM)
self.did_init = True
child = CustomInvoker()
root = CustomHandler(child)
container = Container()
reactor_handler(container, root)
assert root.did_init
assert child.did_init
assert root.did_custom
assert child.did_custom
def set_root(self, reactor, root):
reactor.handler = root
def add_root(self, reactor, root):
def append_root(self, reactor, root):
def event_root(self, event):
return event.handler
def event_reactor_handler(self, event):
return event.reactor.handler
def test_set_handler(self):
self.do_customEvent(self.set_root, self.event_reactor_handler)
def test_add_handler(self):
self.do_customEvent(self.add_root, self.event_reactor_handler)
def test_append_handler(self):
self.do_customEvent(self.append_root, self.event_reactor_handler)
def test_set_root(self):
self.do_customEvent(self.set_root, self.event_root)
def test_add_root(self):
self.do_customEvent(self.add_root, self.event_root)
def test_append_root(self):
self.do_customEvent(self.append_root, self.event_root)