| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import absolute_import |
| |
| import os |
| import gc |
| import traceback |
| |
| from proton import * |
| from proton.reactor import Container |
| |
| from . import common |
| |
| CUSTOM = EventType("custom") |
| |
| |
| class HandlerTest(common.Test): |
| def test_reactorHandlerCycling(self, n=0): |
| |
| class CustomHandler(Handler): |
| UNSET = 999999999 |
| |
| def __init__(self): |
| self.offset = len(traceback.extract_stack()) |
| |
| def on_reactor_init(self, event): |
| self.depth = len(traceback.extract_stack()) |
| |
| def reset(self): |
| self.depth = self.UNSET |
| |
| @property |
| def init_depth(self): |
| d = self.depth - self.offset |
| return d |
| custom = CustomHandler() |
| |
| container = Container() |
| container.handler = custom |
| for i in range(n): |
| h = container.handler |
| container.handler = h |
| custom.reset() |
| container.run() |
| assert custom.init_depth < 50, "Unexpectedly long traceback for a simple handler" |
| |
| def test_reactorHandlerCycling10k(self): |
| self.test_reactorHandlerCycling(10000) |
| |
| def test_reactorHandlerCycling100(self): |
| self.test_reactorHandlerCycling(100) |
| |
| def do_customEvent(self, reactor_handler, event_root): |
| |
| class CustomHandler: |
| did_custom = False |
| did_init = False |
| |
| def __init__(self, *handlers): |
| self.handlers = handlers |
| |
| def on_reactor_init(self, event): |
| self.did_init = True |
| |
| def on_custom(self, event): |
| self.did_custom = True |
| |
| class CustomInvoker(CustomHandler): |
| def on_reactor_init(self, event): |
| h = event_root(event) |
| event.dispatch(h, CUSTOM) |
| self.did_init = True |
| |
| child = CustomInvoker() |
| root = CustomHandler(child) |
| |
| container = Container() |
| |
| reactor_handler(container, root) |
| container.run() |
| assert root.did_init |
| assert child.did_init |
| assert root.did_custom |
| assert child.did_custom |
| |
| def set_root(self, reactor, root): |
| reactor.handler = root |
| |
| def add_root(self, reactor, root): |
| reactor.handler.add(root) |
| |
| def append_root(self, reactor, root): |
| reactor.handler.handlers.append(root) |
| |
| def event_root(self, event): |
| return event.handler |
| |
| def event_reactor_handler(self, event): |
| return event.reactor.handler |
| |
| def test_set_handler(self): |
| self.do_customEvent(self.set_root, self.event_reactor_handler) |
| |
| def test_add_handler(self): |
| self.do_customEvent(self.add_root, self.event_reactor_handler) |
| |
| def test_append_handler(self): |
| self.do_customEvent(self.append_root, self.event_reactor_handler) |
| |
| def test_set_root(self): |
| self.do_customEvent(self.set_root, self.event_root) |
| |
| def test_add_root(self): |
| self.do_customEvent(self.add_root, self.event_root) |
| |
| def test_append_root(self): |
| self.do_customEvent(self.append_root, self.event_root) |