| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import unicode_literals |
| from __future__ import division |
| from __future__ import absolute_import |
| from __future__ import print_function |
| |
| from proton import Message, Timeout |
| from system_test import TestCase, Qdrouterd, main_module, TIMEOUT |
| from system_test import unittest |
| from proton.handlers import MessagingHandler |
| from proton.reactor import Container, DynamicNodeProperties |
| from qpid_dispatch_internal.compat import UNICODE |
| from qpid_dispatch.management.client import Node |
| |
| |
| class RouterTest(TestCase): |
| |
| inter_router_port = None |
| |
| @classmethod |
| def setUpClass(cls): |
| """Start a router""" |
| super(RouterTest, cls).setUpClass() |
| |
| def router(name, connection): |
| |
| config = [ |
| ('router', {'mode': 'interior', 'id': name}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}), |
| ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'in', 'containerId': 'LRC'}), |
| ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'out', 'containerId': 'LRC'}), |
| ('autoLink', {'address': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'in'}), |
| ('autoLink', {'address': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'out'}), |
| ('autoLink', {'address': '0.0.0.0/queue.ext', 'containerId': 'ALCE', 'direction': 'in', 'externalAddress': 'EXT'}), |
| ('autoLink', {'address': '0.0.0.0/queue.ext', 'containerId': 'ALCE', 'direction': 'out', 'externalAddress': 'EXT'}), |
| ('address', {'prefix': 'closest', 'distribution': 'closest'}), |
| ('address', {'prefix': 'spread', 'distribution': 'balanced'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}), |
| connection |
| ] |
| |
| config = Qdrouterd.Config(config) |
| |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) |
| |
| cls.routers = [] |
| |
| inter_router_port = cls.tester.get_port() |
| |
| router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port})) |
| router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, 'verifyHostname': 'no'})) |
| |
| cls.routers[0].wait_router_connected('B') |
| cls.routers[1].wait_router_connected('A') |
| |
| |
| def test_01_one_router_targeted_sender_no_tenant(self): |
| test = MessageTransferTest(self.routers[0].addresses[0], |
| self.routers[0].addresses[0], |
| "anything/addr_01", |
| "anything/addr_01", |
| self.routers[0].addresses[0], |
| "M0anything/addr_01") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_02_one_router_targeted_sender_tenant_on_sender(self): |
| test = MessageTransferTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[0], |
| "addr_02", |
| "0.0.0.0/addr_02", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_02") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_03_one_router_targeted_sender_tenant_on_receiver(self): |
| test = MessageTransferTest(self.routers[0].addresses[0], |
| self.routers[0].addresses[1], |
| "0.0.0.0/addr_03", |
| "addr_03", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_03") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_04_one_router_targeted_sender_tenant_on_both(self): |
| test = MessageTransferTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[1], |
| "addr_04", |
| "addr_04", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_04") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_05_two_router_targeted_sender_no_tenant(self): |
| test = MessageTransferTest(self.routers[0].addresses[0], |
| self.routers[1].addresses[0], |
| "0.0.0.0/addr_05", |
| "0.0.0.0/addr_05", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_05") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_06_two_router_targeted_sender_tenant_on_sender(self): |
| test = MessageTransferTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[0], |
| "addr_06", |
| "0.0.0.0/addr_06", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_06") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_07_two_router_targeted_sender_tenant_on_receiver(self): |
| test = MessageTransferTest(self.routers[0].addresses[0], |
| self.routers[1].addresses[1], |
| "0.0.0.0/addr_07", |
| "addr_07", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_07") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_08_two_router_targeted_sender_tenant_on_both(self): |
| test = MessageTransferTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[1], |
| "addr_08", |
| "addr_08", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_08") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_09_one_router_anonymous_sender_no_tenant(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[0], |
| self.routers[0].addresses[0], |
| "anything/addr_09", |
| "anything/addr_09", |
| self.routers[0].addresses[0], |
| "M0anything/addr_09") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_10_one_router_anonymous_sender_tenant_on_sender(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[0], |
| "addr_10", |
| "0.0.0.0/addr_10", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_10") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_11_one_router_anonymous_sender_tenant_on_receiver(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[0], |
| self.routers[0].addresses[1], |
| "0.0.0.0/addr_11", |
| "addr_11", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_11") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_12_one_router_anonymous_sender_tenant_on_both(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[1], |
| "addr_12", |
| "addr_12", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_12") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_13_two_router_anonymous_sender_no_tenant(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[0], |
| self.routers[1].addresses[0], |
| "anything/addr_13", |
| "anything/addr_13", |
| self.routers[0].addresses[0], |
| "M0anything/addr_13") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_14_two_router_anonymous_sender_tenant_on_sender(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[0], |
| "addr_14", |
| "0.0.0.0/addr_14", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_14") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_15_two_router_anonymous_sender_tenant_on_receiver(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[0], |
| self.routers[1].addresses[1], |
| "0.0.0.0/addr_15", |
| "addr_15", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_15") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_16_two_router_anonymous_sender_tenant_on_both(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[1], |
| "addr_16", |
| "addr_16", |
| self.routers[0].addresses[0], |
| "M00.0.0.0/addr_16") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_17_one_router_link_route_targeted(self): |
| test = LinkRouteTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[2], |
| "link.addr_17", |
| "0.0.0.0/link.addr_17", |
| False, |
| self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_18_one_router_link_route_targeted_no_tenant(self): |
| test = LinkRouteTest(self.routers[0].addresses[0], |
| self.routers[0].addresses[2], |
| "0.0.0.0/link.addr_18", |
| "0.0.0.0/link.addr_18", |
| False, |
| self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_19_one_router_link_route_dynamic(self): |
| test = LinkRouteTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[2], |
| "link.addr_19", |
| "0.0.0.0/link.addr_19", |
| True, |
| self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_20_one_router_link_route_dynamic_no_tenant(self): |
| test = LinkRouteTest(self.routers[0].addresses[0], |
| self.routers[0].addresses[2], |
| "0.0.0.0/link.addr_20", |
| "0.0.0.0/link.addr_20", |
| True, |
| self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_21_two_router_link_route_targeted(self): |
| test = LinkRouteTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[2], |
| "link.addr_21", |
| "0.0.0.0/link.addr_21", |
| False, |
| self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_22_two_router_link_route_targeted_no_tenant(self): |
| test = LinkRouteTest(self.routers[0].addresses[0], |
| self.routers[1].addresses[2], |
| "0.0.0.0/link.addr_22", |
| "0.0.0.0/link.addr_22", |
| False, |
| self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_23_two_router_link_route_dynamic(self): |
| test = LinkRouteTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[2], |
| "link.addr_23", |
| "0.0.0.0/link.addr_23", |
| True, |
| self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_24_two_router_link_route_dynamic_no_tenant(self): |
| test = LinkRouteTest(self.routers[0].addresses[0], |
| self.routers[1].addresses[2], |
| "0.0.0.0/link.addr_24", |
| "0.0.0.0/link.addr_24", |
| True, |
| self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_25_one_router_anonymous_sender_non_mobile(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[0], |
| "_local/addr_25", |
| "_local/addr_25", |
| self.routers[0].addresses[0], |
| "Laddr_25") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_26_one_router_targeted_sender_non_mobile(self): |
| test = MessageTransferTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[0], |
| "_local/addr_26", |
| "_local/addr_26", |
| self.routers[0].addresses[0], |
| "Laddr_26") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_27_two_router_anonymous_sender_non_mobile(self): |
| test = MessageTransferAnonTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[0], |
| "_topo/0/B/addr_27", |
| "_local/addr_27", |
| self.routers[1].addresses[0], |
| "Laddr_27") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_28_two_router_targeted_sender_non_mobile(self): |
| test = MessageTransferTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[0], |
| "_topo/0/B/addr_28", |
| "_local/addr_28", |
| self.routers[1].addresses[0], |
| "Laddr_28") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_29_one_router_waypoint_no_tenant(self): |
| test = WaypointTest(self.routers[0].addresses[0], |
| self.routers[0].addresses[2], |
| "0.0.0.0/queue.waypoint", |
| "0.0.0.0/queue.waypoint") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_30_one_router_waypoint(self): |
| test = WaypointTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[2], |
| "queue.waypoint", |
| "0.0.0.0/queue.waypoint") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_31_two_router_waypoint_no_tenant(self): |
| test = WaypointTest(self.routers[0].addresses[0], |
| self.routers[1].addresses[2], |
| "0.0.0.0/queue.waypoint", |
| "0.0.0.0/queue.waypoint") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_32_two_router_waypoint(self): |
| test = WaypointTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[2], |
| "queue.waypoint", |
| "0.0.0.0/queue.waypoint") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_33_one_router_waypoint_no_tenant_external_addr(self): |
| test = WaypointTest(self.routers[0].addresses[0], |
| self.routers[0].addresses[2], |
| "0.0.0.0/queue.ext", |
| "EXT", |
| "ALCE") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_34_one_router_waypoint_external_addr(self): |
| test = WaypointTest(self.routers[0].addresses[1], |
| self.routers[0].addresses[2], |
| "queue.ext", |
| "EXT", |
| "ALCE") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_35_two_router_waypoint_no_tenant_external_addr(self): |
| test = WaypointTest(self.routers[0].addresses[0], |
| self.routers[1].addresses[2], |
| "0.0.0.0/queue.ext", |
| "EXT", |
| "ALCE") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| def test_36_two_router_waypoint_external_addr(self): |
| test = WaypointTest(self.routers[0].addresses[1], |
| self.routers[1].addresses[2], |
| "queue.ext", |
| "EXT", |
| "ALCE") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| class Entity(object): |
| def __init__(self, status_code, status_description, attrs): |
| self.status_code = status_code |
| self.status_description = status_description |
| self.attrs = attrs |
| |
| def __getattr__(self, key): |
| return self.attrs[key] |
| |
| |
| class RouterProxy(object): |
| def __init__(self, reply_addr): |
| self.reply_addr = reply_addr |
| |
| def response(self, msg): |
| ap = msg.properties |
| return Entity(ap['statusCode'], ap['statusDescription'], msg.body) |
| |
| def read_address(self, name): |
| ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name} |
| return Message(properties=ap, reply_to=self.reply_addr) |
| |
| def query_addresses(self): |
| ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'} |
| return Message(properties=ap, reply_to=self.reply_addr) |
| |
| |
| class Timeout(object): |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def on_timer_task(self, event): |
| self.parent.timeout() |
| |
| |
| class PollTimeout(object): |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def on_timer_task(self, event): |
| self.parent.poll_timeout() |
| |
| |
| class MessageTransferTest(MessagingHandler): |
| def __init__(self, sender_host, receiver_host, sender_address, receiver_address, lookup_host, lookup_address): |
| super(MessageTransferTest, self).__init__() |
| self.sender_host = sender_host |
| self.receiver_host = receiver_host |
| self.sender_address = sender_address |
| self.receiver_address = receiver_address |
| self.lookup_host = lookup_host |
| self.lookup_address = lookup_address |
| |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.lookup_conn = None |
| self.error = None |
| self.sender = None |
| self.receiver = None |
| self.proxy = None |
| |
| self.count = 10 |
| self.n_sent = 0 |
| self.n_rcvd = 0 |
| self.n_accepted = 0 |
| |
| self.n_receiver_opened = 0 |
| self.n_sender_opened = 0 |
| |
| def timeout(self): |
| self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_accepted=%d n_receiver_opened=%d n_sender_opened=%d" %\ |
| (self.n_sent, self.n_rcvd, self.n_accepted, self.n_receiver_opened, self.n_sender_opened) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.lookup_conn.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) |
| self.sender_conn = event.container.connect(self.sender_host) |
| self.receiver_conn = event.container.connect(self.receiver_host) |
| self.lookup_conn = event.container.connect(self.lookup_host) |
| self.reply_receiver = event.container.create_receiver(self.lookup_conn, dynamic=True) |
| self.agent_sender = event.container.create_sender(self.lookup_conn, "$management") |
| |
| def send(self): |
| while self.sender.credit > 0 and self.n_sent < self.count: |
| self.n_sent += 1 |
| m = Message(body="Message %d of %d" % (self.n_sent, self.count)) |
| self.sender.send(m) |
| |
| def on_link_opened(self, event): |
| if event.receiver: |
| self.n_receiver_opened += 1 |
| else: |
| self.n_sender_opened += 1 |
| |
| if event.receiver == self.reply_receiver: |
| self.proxy = RouterProxy(self.reply_receiver.remote_source.address) |
| self.sender = event.container.create_sender(self.sender_conn, self.sender_address) |
| self.receiver = event.container.create_receiver(self.receiver_conn, self.receiver_address) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| self.send() |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_rcvd += 1 |
| if event.receiver == self.reply_receiver: |
| response = self.proxy.response(event.message) |
| if response.status_code != 200: |
| self.error = "Unexpected error code from agent: %d - %s" % (response.status_code, response.status_description) |
| if self.n_sent != self.count or self.n_rcvd != self.count: |
| self.error = "Unexpected counts: n_sent=%d n_rcvd=%d n_accepted=%d" % (self.n_sent, self.n_rcvd, self.n_accepted) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.lookup_conn.close() |
| self.timer.cancel() |
| |
| def on_accepted(self, event): |
| if event.sender == self.sender: |
| self.n_accepted += 1 |
| if self.n_accepted == self.count: |
| request = self.proxy.read_address(self.lookup_address) |
| self.agent_sender.send(request) |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MessageTransferAnonTest(MessagingHandler): |
| def __init__(self, sender_host, receiver_host, sender_address, receiver_address, lookup_host, lookup_address): |
| super(MessageTransferAnonTest, self).__init__() |
| self.sender_host = sender_host |
| self.receiver_host = receiver_host |
| self.sender_address = sender_address |
| self.receiver_address = receiver_address |
| self.lookup_host = lookup_host |
| self.lookup_address = lookup_address |
| |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.lookup_conn = None |
| self.error = None |
| self.sender = None |
| self.receiver = None |
| self.proxy = None |
| |
| self.count = 10 |
| self.n_sent = 0 |
| self.n_rcvd = 0 |
| self.n_accepted = 0 |
| |
| self.n_agent_reads = 0 |
| self.n_receiver_opened = 0 |
| self.n_sender_opened = 0 |
| |
| def timeout(self): |
| self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_accepted=%d n_agent_reads=%d n_receiver_opened=%d n_sender_opened=%d" %\ |
| (self.n_sent, self.n_rcvd, self.n_accepted, self.n_agent_reads, self.n_receiver_opened, self.n_sender_opened) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.lookup_conn.close() |
| if self.poll_timer: |
| self.poll_timer.cancel() |
| |
| def poll_timeout(self): |
| self.poll() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) |
| self.poll_timer = None |
| self.sender_conn = event.container.connect(self.sender_host) |
| self.receiver_conn = event.container.connect(self.receiver_host) |
| self.lookup_conn = event.container.connect(self.lookup_host) |
| self.reply_receiver = event.container.create_receiver(self.lookup_conn, dynamic=True) |
| self.agent_sender = event.container.create_sender(self.lookup_conn, "$management") |
| self.receiver = event.container.create_receiver(self.receiver_conn, self.receiver_address) |
| |
| def send(self): |
| while self.sender.credit > 0 and self.n_sent < self.count: |
| self.n_sent += 1 |
| m = Message(body="Message %d of %d" % (self.n_sent, self.count)) |
| m.address = self.sender_address |
| self.sender.send(m) |
| |
| def poll(self): |
| request = self.proxy.read_address(self.lookup_address) |
| self.agent_sender.send(request) |
| self.n_agent_reads += 1 |
| |
| def on_link_opened(self, event): |
| if event.receiver: |
| self.n_receiver_opened += 1 |
| else: |
| self.n_sender_opened += 1 |
| |
| if event.receiver == self.reply_receiver: |
| self.proxy = RouterProxy(self.reply_receiver.remote_source.address) |
| self.poll() |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| self.send() |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_rcvd += 1 |
| |
| if event.receiver == self.reply_receiver: |
| response = self.proxy.response(event.message) |
| if response.status_code == 200 and (response.remoteCount + response.subscriberCount) > 0: |
| self.sender = event.container.create_sender(self.sender_conn, None) |
| if self.poll_timer: |
| self.poll_timer.cancel() |
| self.poll_timer = None |
| else: |
| self.poll_timer = event.reactor.schedule(0.25, PollTimeout(self)) |
| |
| def on_accepted(self, event): |
| if event.sender == self.sender: |
| self.n_accepted += 1 |
| if self.n_accepted == self.count: |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.lookup_conn.close() |
| self.timer.cancel() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class LinkRouteTest(MessagingHandler): |
| def __init__(self, first_host, second_host, first_address, second_address, dynamic, lookup_host): |
| super(LinkRouteTest, self).__init__(prefetch=0) |
| self.first_host = first_host |
| self.second_host = second_host |
| self.first_address = first_address |
| self.second_address = second_address |
| self.dynamic = dynamic |
| self.lookup_host = lookup_host |
| |
| self.first_conn = None |
| self.second_conn = None |
| self.error = None |
| self.first_sender = None |
| self.first_receiver = None |
| self.second_sender = None |
| self.second_receiver = None |
| self.poll_timer = None |
| |
| self.count = 10 |
| self.n_sent = 0 |
| self.n_rcvd = 0 |
| self.n_settled = 0 |
| |
| def timeout(self): |
| self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_settled=%d" % (self.n_sent, self.n_rcvd, self.n_settled) |
| self.first_conn.close() |
| self.second_conn.close() |
| self.lookup_conn.close() |
| if self.poll_timer: |
| self.poll_timer.cancel() |
| |
| def poll_timeout(self): |
| self.poll() |
| |
| def fail(self, text): |
| self.error = text |
| self.second_conn.close() |
| self.first_conn.close() |
| self.timer.cancel() |
| self.lookup_conn.close() |
| if self.poll_timer: |
| self.poll_timer.cancel() |
| |
| def send(self): |
| while self.first_sender.credit > 0 and self.n_sent < self.count: |
| self.n_sent += 1 |
| m = Message(body="Message %d of %d" % (self.n_sent, self.count)) |
| self.first_sender.send(m) |
| |
| def poll(self): |
| request = self.proxy.read_address("D0.0.0.0/link") |
| self.agent_sender.send(request) |
| |
| def setup_first_links(self, event): |
| self.first_sender = event.container.create_sender(self.first_conn, self.first_address) |
| if self.dynamic: |
| self.first_receiver = event.container.create_receiver(self.first_conn, |
| dynamic=True, |
| options=DynamicNodeProperties({"x-opt-qd.address": |
| UNICODE(self.first_address)})) |
| else: |
| self.first_receiver = event.container.create_receiver(self.first_conn, self.first_address) |
| |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) |
| self.first_conn = event.container.connect(self.first_host) |
| self.second_conn = event.container.connect(self.second_host) |
| self.lookup_conn = event.container.connect(self.lookup_host) |
| self.reply_receiver = event.container.create_receiver(self.lookup_conn, dynamic=True) |
| self.agent_sender = event.container.create_sender(self.lookup_conn, "$management") |
| |
| |
| def on_link_opening(self, event): |
| if event.sender: |
| self.second_sender = event.sender |
| if self.dynamic: |
| if event.sender.remote_source.dynamic: |
| event.sender.source.address = self.second_address |
| event.sender.open() |
| else: |
| self.fail("Expected dynamic source on sender") |
| else: |
| if event.sender.remote_source.address == self.second_address: |
| event.sender.source.address = self.second_address |
| event.sender.open() |
| else: |
| self.fail("Incorrect address on incoming sender: got %s, expected %s" % |
| (event.sender.remote_source.address, self.second_address)) |
| |
| elif event.receiver: |
| self.second_receiver = event.receiver |
| if event.receiver.remote_target.address == self.second_address: |
| event.receiver.target.address = self.second_address |
| event.receiver.open() |
| else: |
| self.fail("Incorrect address on incoming receiver: got %s, expected %s" % |
| (event.receiver.remote_target.address, self.second_address)) |
| |
| |
| def on_link_opened(self, event): |
| if event.receiver: |
| event.receiver.flow(self.count) |
| |
| if event.receiver == self.reply_receiver: |
| self.proxy = RouterProxy(self.reply_receiver.remote_source.address) |
| self.poll() |
| |
| def on_sendable(self, event): |
| if event.sender == self.first_sender: |
| self.send() |
| |
| def on_message(self, event): |
| if event.receiver == self.first_receiver: |
| self.n_rcvd += 1 |
| |
| if event.receiver == self.reply_receiver: |
| response = self.proxy.response(event.message) |
| if response.status_code == 200 and (response.remoteCount + response.containerCount) > 0: |
| if self.poll_timer: |
| self.poll_timer.cancel() |
| self.poll_timer = None |
| self.setup_first_links(event) |
| else: |
| self.poll_timer = event.reactor.schedule(0.25, PollTimeout(self)) |
| |
| def on_settled(self, event): |
| if event.sender == self.first_sender: |
| self.n_settled += 1 |
| if self.n_settled == self.count: |
| self.fail(None) |
| |
| def run(self): |
| container = Container(self) |
| container.container_id = 'LRC' |
| container.run() |
| |
| |
| class WaypointTest(MessagingHandler): |
| def __init__(self, first_host, second_host, first_address, second_address, container_id="ALC"): |
| super(WaypointTest, self).__init__() |
| self.first_host = first_host |
| self.second_host = second_host |
| self.first_address = first_address |
| self.second_address = second_address |
| self.container_id = container_id |
| |
| self.first_conn = None |
| self.second_conn = None |
| self.error = None |
| self.first_sender = None |
| self.first_receiver = None |
| self.waypoint_sender = None |
| self.waypoint_receiver = None |
| self.waypoint_queue = [] |
| |
| self.count = 10 |
| self.n_sent = 0 |
| self.n_rcvd = 0 |
| self.n_thru = 0 |
| |
| def timeout(self): |
| self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_thru=%d" % (self.n_sent, self.n_rcvd, self.n_thru) |
| self.first_conn.close() |
| self.second_conn.close() |
| |
| def fail(self, text): |
| self.error = text |
| self.second_conn.close() |
| self.first_conn.close() |
| self.timer.cancel() |
| |
| def send_client(self): |
| while self.first_sender.credit > 0 and self.n_sent < self.count: |
| self.n_sent += 1 |
| m = Message(body="Message %d of %d" % (self.n_sent, self.count)) |
| self.first_sender.send(m) |
| |
| def send_waypoint(self): |
| while self.waypoint_sender.credit > 0 and len(self.waypoint_queue) > 0: |
| self.n_thru += 1 |
| m = self.waypoint_queue.pop() |
| self.waypoint_sender.send(m) |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) |
| self.first_conn = event.container.connect(self.first_host) |
| self.second_conn = event.container.connect(self.second_host) |
| |
| def on_connection_opened(self, event): |
| if event.connection == self.first_conn: |
| self.first_sender = event.container.create_sender(self.first_conn, self.first_address) |
| self.first_receiver = event.container.create_receiver(self.first_conn, self.first_address) |
| |
| def on_link_opening(self, event): |
| if event.sender: |
| self.waypoint_sender = event.sender |
| if event.sender.remote_source.address == self.second_address: |
| event.sender.source.address = self.second_address |
| event.sender.open() |
| else: |
| self.fail("Incorrect address on incoming sender: got %s, expected %s" % |
| (event.sender.remote_source.address, self.second_address)) |
| |
| elif event.receiver: |
| self.waypoint_receiver = event.receiver |
| if event.receiver.remote_target.address == self.second_address: |
| event.receiver.target.address = self.second_address |
| event.receiver.open() |
| else: |
| self.fail("Incorrect address on incoming receiver: got %s, expected %s" % |
| (event.receiver.remote_target.address, self.second_address)) |
| |
| |
| def on_sendable(self, event): |
| if event.sender == self.first_sender: |
| self.send_client() |
| elif event.sender == self.waypoint_sender: |
| self.send_waypoint() |
| |
| def on_message(self, event): |
| if event.receiver == self.first_receiver: |
| self.n_rcvd += 1 |
| if self.n_rcvd == self.count and self.n_thru == self.count: |
| self.fail(None) |
| elif event.receiver == self.waypoint_receiver: |
| m = Message(body=event.message.body) |
| self.waypoint_queue.append(m) |
| self.send_waypoint() |
| |
| def run(self): |
| container = Container(self) |
| container.container_id = self.container_id |
| container.run() |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |