| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import time |
| |
| from proton import Message, symbol |
| from proton.handlers import MessagingHandler |
| from proton.reactor import Container |
| |
| from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, TestTimeout |
| from system_test import unittest, skip_test_in_ci |
| from system_test import Logger |
| |
| |
| class AddrTimer: |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def on_timer_task(self, event): |
| self.parent.check_address() |
| |
| |
| class RouterTest(TestCase): |
| |
| inter_router_port = None |
| |
| @classmethod |
| def setUpClass(cls): |
| """Start a router""" |
| super(RouterTest, cls).setUpClass() |
| |
| def router(name, mode, connection, extra=None): |
| config = [ |
| ('router', {'mode': mode, 'id': name}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), |
| ('listener', {'port': cls.tester.get_port(), 'role': 'route-container', 'name': 'WP'}), |
| ('address', {'prefix': 'dest', 'enableFallback': 'yes'}), |
| ('autoLink', {'connection': 'WP', 'address': 'dest.al', 'direction': 'out', 'fallback': 'yes'}), |
| ('autoLink', {'connection': 'WP', 'address': 'dest.al', 'direction': 'in', 'fallback': 'yes'}), |
| connection |
| ] |
| |
| if extra: |
| config.append(extra) |
| config = Qdrouterd.Config(config) |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) |
| |
| cls.routers = [] |
| |
| inter_router_port = cls.tester.get_port() |
| edge_port_A = cls.tester.get_port() |
| edge_port_B = cls.tester.get_port() |
| |
| router('INT.A', 'interior', ('listener', {'role': 'inter-router', 'port': inter_router_port}), |
| ('listener', {'role': 'edge', 'port': edge_port_A})) |
| router('INT.B', 'interior', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}), |
| ('listener', {'role': 'edge', 'port': edge_port_B})) |
| router('EA1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A})) |
| router('EA2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A})) |
| router('EB1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B})) |
| router('EB2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B})) |
| |
| cls.routers[0].wait_router_connected('INT.B') |
| cls.routers[1].wait_router_connected('INT.A') |
| |
| cls.ROUTER_INTA = cls.routers[0].addresses[0] |
| cls.ROUTER_INTB = cls.routers[1].addresses[0] |
| cls.ROUTER_EA1 = cls.routers[2].addresses[0] |
| cls.ROUTER_EA2 = cls.routers[3].addresses[0] |
| cls.ROUTER_EB1 = cls.routers[4].addresses[0] |
| cls.ROUTER_EB2 = cls.routers[5].addresses[0] |
| |
| cls.ROUTER_INTA_WP = cls.routers[0].addresses[1] |
| cls.ROUTER_INTB_WP = cls.routers[1].addresses[1] |
| cls.ROUTER_EA1_WP = cls.routers[2].addresses[1] |
| cls.ROUTER_EA2_WP = cls.routers[3].addresses[1] |
| cls.ROUTER_EB1_WP = cls.routers[4].addresses[1] |
| cls.ROUTER_EB2_WP = cls.routers[5].addresses[1] |
| |
| def test_01_sender_first_primary_same_interior(self): |
| test = SenderFirstTest(self.ROUTER_INTA, |
| self.ROUTER_INTA, |
| 'dest.01', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_02_sender_first_fallback_same_interior(self): |
| test = SenderFirstTest(self.ROUTER_INTA, |
| self.ROUTER_INTA, |
| 'dest.02', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_03_sender_first_primary_same_edge(self): |
| test = SenderFirstTest(self.ROUTER_EA1, |
| self.ROUTER_EA1, |
| 'dest.03', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_04_sender_first_fallback_same_edge(self): |
| test = SenderFirstTest(self.ROUTER_EA1, |
| self.ROUTER_EA1, |
| 'dest.04', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_05_sender_first_primary_interior_interior(self): |
| test = SenderFirstTest(self.ROUTER_INTA, |
| self.ROUTER_INTB, |
| 'dest.05', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_06_sender_first_fallback_interior_interior(self): |
| test = SenderFirstTest(self.ROUTER_INTA, |
| self.ROUTER_INTB, |
| 'dest.06', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_07_sender_first_primary_edge_interior(self): |
| test = SenderFirstTest(self.ROUTER_EA1, |
| self.ROUTER_INTB, |
| 'dest.07', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_08_sender_first_fallback_edge_interior(self): |
| test = SenderFirstTest(self.ROUTER_EA1, |
| self.ROUTER_INTB, |
| 'dest.08', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_09_sender_first_primary_interior_edge(self): |
| test = SenderFirstTest(self.ROUTER_INTB, |
| self.ROUTER_EA1, |
| 'dest.09', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_10_sender_first_fallback_interior_edge(self): |
| test = SenderFirstTest(self.ROUTER_INTB, |
| self.ROUTER_EA1, |
| 'dest.10', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_11_sender_first_primary_edge_edge(self): |
| test = SenderFirstTest(self.ROUTER_EA1, |
| self.ROUTER_EB1, |
| 'dest.11', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_12_sender_first_fallback_edge_edge(self): |
| test = SenderFirstTest(self.ROUTER_EA1, |
| self.ROUTER_EB1, |
| 'dest.12', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_13_receiver_first_primary_same_interior(self): |
| test = ReceiverFirstTest(self.ROUTER_INTA, |
| self.ROUTER_INTA, |
| 'dest.13', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_14_receiver_first_fallback_same_interior(self): |
| test = ReceiverFirstTest(self.ROUTER_INTA, |
| self.ROUTER_INTA, |
| 'dest.14', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_15_receiver_first_primary_same_edge(self): |
| test = ReceiverFirstTest(self.ROUTER_EA1, |
| self.ROUTER_EA1, |
| 'dest.15', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_16_receiver_first_fallback_same_edge(self): |
| test = ReceiverFirstTest(self.ROUTER_EA1, |
| self.ROUTER_EA1, |
| 'dest.16', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_17_receiver_first_primary_interior_interior(self): |
| test = ReceiverFirstTest(self.ROUTER_INTA, |
| self.ROUTER_INTB, |
| 'dest.17', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_18_receiver_first_fallback_interior_interior(self): |
| test = ReceiverFirstTest(self.ROUTER_INTA, |
| self.ROUTER_INTB, |
| 'dest.18', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_19_receiver_first_primary_edge_interior(self): |
| test = ReceiverFirstTest(self.ROUTER_EA1, |
| self.ROUTER_INTB, |
| 'dest.19', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_20_receiver_first_fallback_edge_interior(self): |
| test = ReceiverFirstTest(self.ROUTER_EA1, |
| self.ROUTER_INTB, |
| 'dest.20', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_21_receiver_first_primary_interior_edge(self): |
| test = ReceiverFirstTest(self.ROUTER_INTB, |
| self.ROUTER_EA1, |
| 'dest.21', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_22_receiver_first_fallback_interior_edge(self): |
| test = ReceiverFirstTest(self.ROUTER_INTB, |
| self.ROUTER_EA1, |
| 'dest.22', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_23_receiver_first_primary_edge_edge(self): |
| test = ReceiverFirstTest(self.ROUTER_EA1, |
| self.ROUTER_EB1, |
| 'dest.23', False) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_24_receiver_first_fallback_edge_edge(self): |
| test = ReceiverFirstTest(self.ROUTER_EA1, |
| self.ROUTER_EB1, |
| 'dest.24', True) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| skip_reason = 'Test skipped until switchover use case resolved' |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_25_switchover_same_edge(self): |
| test = SwitchoverTest([self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_EA1, "EA1"], |
| 'dest.25') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_26_switchover_same_interior(self): |
| test = SwitchoverTest([self.ROUTER_INTA, "INTA"], |
| [self.ROUTER_INTA, "INTA"], |
| [self.ROUTER_INTA, "INTA"], |
| 'dest.26') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_27_switchover_local_edge_alt_remote_interior(self): |
| test = SwitchoverTest([self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_INTA, "INTA"], |
| [self.ROUTER_EA1, "EA1"], |
| 'dest.27') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_28_switchover_local_edge_alt_remote_edge(self): |
| test = SwitchoverTest([self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_EB1, "EB1"], |
| [self.ROUTER_EA1, "EA1"], |
| 'dest.28') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_29_switchover_local_edge_pri_remote_interior(self): |
| test = SwitchoverTest([self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_INTA, "INTA"], |
| 'dest.29') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_30_switchover_local_interior_pri_remote_edge(self): |
| test = SwitchoverTest([self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_EB1, "EB1"], |
| 'dest.30') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_31_switchover_local_interior_alt_remote_interior(self): |
| test = SwitchoverTest([self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_INTA, "INTA"], |
| [self.ROUTER_INTB, "INTB"], |
| 'dest.31') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_32_switchover_local_interior_alt_remote_edge(self): |
| test = SwitchoverTest([self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_EA2, "EA2"], |
| [self.ROUTER_INTB, "INTB"], |
| 'dest.32') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_33_switchover_local_interior_pri_remote_interior(self): |
| test = SwitchoverTest([self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_INTA, "INTA"], |
| 'dest.33') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_34_switchover_local_interior_pri_remote_edge(self): |
| test = SwitchoverTest([self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_EB1, "EB1"], |
| 'dest.34') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_35_switchover_mix_1(self): |
| test = SwitchoverTest([self.ROUTER_INTA, "INTA"], |
| [self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_EA1, "EA1"], |
| 'dest.35') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_36_switchover_mix_2(self): |
| test = SwitchoverTest([self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_INTA, "INTA"], |
| 'dest.36') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_37_switchover_mix_3(self): |
| test = SwitchoverTest([self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_INTB, "INTB"], |
| [self.ROUTER_EB1, "EB1"], |
| 'dest.37') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| @unittest.skipIf(skip_test_in_ci('QPID_SYSTEM_TEST_SKIP_FALLBACK_SWITCHOVER_TEST'), skip_reason) |
| def test_38_switchover_mix_4(self): |
| test = SwitchoverTest([self.ROUTER_EA1, "EA1"], |
| [self.ROUTER_EA2, "EA2"], |
| [self.ROUTER_EB1, "EB1"], |
| 'dest.38') |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_39_auto_link_sender_first_fallback_same_interior(self): |
| test = SenderFirstAutoLinkTest(self.ROUTER_INTA, |
| self.ROUTER_INTA_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_40_auto_link_sender_first_fallback_same_edge(self): |
| test = SenderFirstAutoLinkTest(self.ROUTER_EA1, |
| self.ROUTER_EA1_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_41_auto_link_sender_first_fallback_interior_interior(self): |
| test = SenderFirstAutoLinkTest(self.ROUTER_INTA, |
| self.ROUTER_INTB_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_42_auto_link_sender_first_fallback_edge_interior(self): |
| test = SenderFirstAutoLinkTest(self.ROUTER_EA1, |
| self.ROUTER_INTA_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_43_auto_link_sender_first_fallback_interior_edge(self): |
| test = SenderFirstAutoLinkTest(self.ROUTER_INTB, |
| self.ROUTER_EA1_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_44_auto_link_sender_first_fallback_edge_edge(self): |
| test = SenderFirstAutoLinkTest(self.ROUTER_EA1, |
| self.ROUTER_EB1_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_45_auto_link_receiver_first_fallback_same_interior(self): |
| test = ReceiverFirstAutoLinkTest(self.ROUTER_INTA, |
| self.ROUTER_INTA_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_46_auto_link_receiver_first_fallback_same_edge(self): |
| test = ReceiverFirstAutoLinkTest(self.ROUTER_EA1, |
| self.ROUTER_EA1_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_47_auto_link_receiver_first_fallback_interior_interior(self): |
| test = ReceiverFirstAutoLinkTest(self.ROUTER_INTA, |
| self.ROUTER_INTB_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_48_auto_link_receiver_first_fallback_edge_interior(self): |
| test = ReceiverFirstAutoLinkTest(self.ROUTER_EA1, |
| self.ROUTER_INTB_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_49_auto_link_receiver_first_fallback_interior_edge(self): |
| test = ReceiverFirstAutoLinkTest(self.ROUTER_INTB, |
| self.ROUTER_EA1_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| def test_50_auto_link_receiver_first_fallback_edge_edge(self): |
| test = ReceiverFirstAutoLinkTest(self.ROUTER_EA1, |
| self.ROUTER_EB1_WP) |
| test.run() |
| self.assertIsNone(test.error) |
| |
| |
| class SenderFirstTest(MessagingHandler): |
| def __init__(self, sender_host, receiver_host, addr, rx_fallback): |
| super(SenderFirstTest, self).__init__() |
| self.sender_host = sender_host |
| self.receiver_host = receiver_host |
| self.addr = addr |
| self.rx_fallback = rx_fallback |
| self.count = 300 |
| |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.error = None |
| self.n_tx = 0 |
| self.n_rx = 0 |
| self.n_rel = 0 |
| self.tx_seq = 0 |
| |
| def timeout(self): |
| self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d" % (self.n_tx, self.n_rx, self.n_rel) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def fail(self, error): |
| self.error = error |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.sender_conn = event.container.connect(self.sender_host) |
| self.receiver_conn = event.container.connect(self.receiver_host) |
| self.sender = event.container.create_sender(self.sender_conn, self.addr, name=(self.addr + "_sender")) |
| |
| def on_link_opened(self, event): |
| if event.sender == self.sender: |
| rname = self.addr + "_receiver_fallback_" + ("true" if self.rx_fallback else "false") |
| self.receiver = event.container.create_receiver(self.receiver_conn, self.addr, name=rname) |
| if self.rx_fallback: |
| self.receiver.source.capabilities.put_symbol("qd.fallback") |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| while self.sender.credit > 0 and self.n_tx < self.count: |
| self.sender.send(Message("Msg %s %d %d" % (self.addr, self.tx_seq, self.n_tx))) |
| self.n_tx += 1 |
| self.tx_seq += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_rx += 1 |
| if self.n_rx == self.count: |
| self.fail(None) |
| |
| def on_released(self, event): |
| self.n_rel += 1 |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class ReceiverFirstTest(MessagingHandler): |
| def __init__(self, sender_host, receiver_host, addr, rx_fallback): |
| super(ReceiverFirstTest, self).__init__() |
| self.sender_host = sender_host |
| self.receiver_host = receiver_host |
| self.addr = addr |
| self.rx_fallback = rx_fallback |
| self.count = 300 |
| |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.error = None |
| self.n_tx = 0 |
| self.n_rx = 0 |
| self.n_rel = 0 |
| self.tx_seq = 0 |
| |
| def timeout(self): |
| self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d" % (self.n_tx, self.n_rx, self.n_rel) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def fail(self, error): |
| self.error = error |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.sender_conn = event.container.connect(self.sender_host) |
| self.receiver_conn = event.container.connect(self.receiver_host) |
| rname = self.addr + "_receiver_fallback_" + ("true" if self.rx_fallback else "false") |
| self.receiver = event.container.create_receiver(self.receiver_conn, self.addr, name=rname) |
| if self.rx_fallback: |
| self.receiver.source.capabilities.put_symbol("qd.fallback") |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.sender = event.container.create_sender(self.sender_conn, self.addr, name=(self.addr + "_sender")) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| while self.sender.credit > 0 and self.n_tx < self.count: |
| self.sender.send(Message("Msg %s %d %d" % (self.addr, self.tx_seq, self.n_tx))) |
| self.n_tx += 1 |
| self.tx_seq += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_rx += 1 |
| if self.n_rx == self.count: |
| self.fail(None) |
| |
| def on_released(self, event): |
| self.n_rel += 1 |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class SwitchoverTest(MessagingHandler): |
| def __init__(self, sender_host, primary_host, fallback_host, addr): |
| super(SwitchoverTest, self).__init__() |
| self.sender_host = sender_host[0] |
| self.primary_host = primary_host[0] |
| self.fallback_host = fallback_host[0] |
| self.sender_name = sender_host[1] |
| self.primary_name = primary_host[1] |
| self.fallback_name = fallback_host[1] |
| self.addr = addr |
| self.count = 300 |
| |
| # DISPATCH-2213 back off on logging. |
| self.log_sends = 100 # every 100th send |
| self.log_recvs = 100 # every 100th receive |
| self.log_released = 100 # every 100th sender released |
| |
| self.sender_conn = None |
| self.primary_conn = None |
| self.fallback_conn = None |
| self.primary_open = False |
| self.fallback_open = False |
| self.error = None |
| self.n_tx = 0 |
| self.n_rx = 0 |
| self.n_rel = 0 |
| self.phase = 0 |
| self.tx_seq = 0 |
| self.local_rel = 0 |
| |
| self.log_prefix = "FALLBACK_TEST %s" % self.addr |
| self.logger = Logger("SwitchoverTest_%s" % addr, print_to_console=False) |
| # Prepend a convenience SERVER line for scraper tool. |
| # Then the logs from this test can be merged with the router logs in scraper. |
| self.logger.log("SERVER (info) Container Name: %s" % self.addr) |
| self.logger.log("%s SwitchoverTest sender:%s primary:%s fallback:%s" % |
| (self.log_prefix, self.sender_name, self.primary_name, self.fallback_name)) |
| |
| def timeout(self): |
| self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d, phase=%d, local_rel=%d" % \ |
| (self.n_tx, self.n_rx, self.n_rel, self.phase, self.local_rel) |
| self.sender_conn.close() |
| self.primary_conn.close() |
| self.fallback_conn.close() |
| |
| def fail(self, error): |
| self.error = error |
| self.sender_conn.close() |
| self.primary_conn.close() |
| self.fallback_conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.logger.log("%s Opening sender connection to %s" % (self.log_prefix, self.sender_name)) |
| self.sender_conn = event.container.connect(self.sender_host) |
| self.logger.log("%s Opening primary receiver connection to %s" % (self.log_prefix, self.primary_name)) |
| self.primary_conn = event.container.connect(self.primary_host) |
| self.logger.log("%s Opening fallback receiver connection to %s" % (self.log_prefix, self.fallback_name)) |
| self.fallback_conn = event.container.connect(self.fallback_host) |
| self.logger.log("%s Opening primary receiver to %s" % (self.log_prefix, self.primary_name)) |
| self.primary_receiver = event.container.create_receiver(self.primary_conn, self.addr, name=(self.addr + "_primary_receiver")) |
| self.logger.log("%s Opening fallback receiver to %s" % (self.log_prefix, self.fallback_name)) |
| self.fallback_receiver = event.container.create_receiver(self.fallback_conn, self.addr, name=(self.addr + "fallback_receiver")) |
| self.fallback_receiver.source.capabilities.put_object(symbol("qd.fallback")) |
| |
| def on_link_opened(self, event): |
| receiver_event = False |
| if event.receiver == self.primary_receiver: |
| self.logger.log("%s Primary receiver opened" % self.log_prefix) |
| self.primary_open = True |
| receiver_event = True |
| if event.receiver == self.fallback_receiver: |
| self.logger.log("%s Fallback receiver opened" % self.log_prefix) |
| self.fallback_open = True |
| receiver_event = True |
| if receiver_event and self.primary_open and self.fallback_open: |
| self.logger.log("%s Opening sender to %s" % (self.log_prefix, self.sender_name)) |
| self.sender = event.container.create_sender(self.sender_conn, self.addr, name=(self.addr + "_sender")) |
| |
| def on_link_closed(self, event): |
| if event.receiver == self.primary_receiver: |
| self.logger.log("%s Primary receiver closed. Start phase 1 send" % self.log_prefix) |
| self.n_rx = 0 |
| self.n_tx = 0 |
| self.send() |
| |
| def send(self): |
| e_credit = self.sender.credit |
| e_n_tx = self.n_tx |
| e_tx_seq = self.tx_seq |
| last_message = Message("None") |
| while self.sender.credit > 0 and self.n_tx < self.count and not self.sender.drain_mode: |
| last_message = Message("Msg %s %d %d" % (self.addr, self.tx_seq, self.n_tx)) |
| self.sender.send(last_message) |
| self.n_tx += 1 |
| self.tx_seq += 1 |
| if self.sender.drain_mode: |
| n_drained = self.sender.drained() |
| self.logger.log("%s sender.drained() drained %d credits" % (self.log_prefix, n_drained)) |
| if self.n_tx > e_n_tx and self.n_tx % self.log_sends == 0: # if sent then log every Nth message |
| self.logger.log("%s send() exit: last sent '%s' phase=%d, credit=%3d->%3d, n_tx=%4d->%4d, tx_seq=%4d->%4d, n_rel=%4d" % |
| (self.log_prefix, last_message.body, self.phase, e_credit, self.sender.credit, |
| e_n_tx, self.n_tx, e_tx_seq, self.tx_seq, self.n_rel)) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| self.send() |
| else: |
| self.fail("%s on_sendable event not from the only sender") |
| |
| def on_message(self, event): |
| if event.receiver == self.primary_receiver: |
| if self.phase == 0: |
| self.n_rx += 1 |
| if self.n_rx % self.log_recvs == 0: |
| self.logger.log("%s Received phase 0 message '%s', n_rx=%d" % |
| (self.log_prefix, event.message.body, self.n_rx)) |
| if self.n_rx == self.count: |
| self.logger.log("%s Triggering fallback by closing primary receiver on %s. Test phase 0->1." % |
| (self.log_prefix, self.primary_name)) |
| self.phase = 1 |
| self.primary_receiver.close() |
| else: |
| # Phase 1 messages are unexpected on primary receiver |
| self.logger.log("%s Phase %d message received on primary: '%s'" % (self.log_prefix, self.phase, event.message.body)) |
| self.fail("Receive phase1 message on primary receiver") |
| elif event.receiver == self.fallback_receiver: |
| if self.phase == 0: |
| # Phase 0 message over fallback receiver. This may happen because |
| # primary receiver is on a distant router and the fallback receiver is local. |
| # Release the message to keep trying until the primary receiver kicks in. |
| self.release(event.delivery) |
| self.n_rel += 1 |
| self.n_tx -= 1 |
| self.local_rel += 1 |
| if self.local_rel % self.log_recvs == 0: |
| self.logger.log("%s Released phase 0 over fallback: msg:'%s', n_rx=%d, n_tx=%d, n_rel=%d, local_rel=%d" % |
| (self.log_prefix, event.message.body, self.n_rx, self.n_tx, self.n_rel, self.local_rel)) |
| time.sleep(0.02) |
| else: |
| self.n_rx += 1 |
| if self.n_rx % self.log_recvs == 0: |
| self.logger.log("%s Received phase 1 over fallback: msg:'%s', n_rx=%d" % |
| (self.log_prefix, event.message.body, self.n_rx)) |
| if self.n_rx == self.count: |
| self.logger.log("%s Success" % self.log_prefix) |
| self.fail(None) |
| else: |
| self.fail("%s message received on unidentified receiver" % self.addr) |
| |
| def on_released(self, event): |
| # event type pn_delivery for sender |
| self.n_rel += 1 |
| self.n_tx -= 1 |
| if self.n_rel % self.log_released == 0: |
| self.logger.log("%s on_released: sender delivery was released. Adjusted counts: n_rel=%d, n_tx=%d" % |
| (self.log_prefix, self.n_rel, self.n_tx)) |
| if event.sender is None: |
| self.fail("on_released event not related to sender") |
| |
| def run(self): |
| Container(self).run() |
| if self.error is not None: |
| self.logger.dump() |
| |
| |
| class SenderFirstAutoLinkTest(MessagingHandler): |
| def __init__(self, sender_host, receiver_host): |
| super(SenderFirstAutoLinkTest, self).__init__() |
| self.sender_host = sender_host |
| self.receiver_host = receiver_host |
| self.addr = "dest.al" |
| self.count = 300 |
| |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.error = None |
| self.n_tx = 0 |
| self.n_rx = 0 |
| self.n_rel = 0 |
| self.tx_seq = 0 |
| |
| def timeout(self): |
| self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d" % (self.n_tx, self.n_rx, self.n_rel) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def fail(self, error): |
| self.error = error |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.sender_conn = event.container.connect(self.sender_host) |
| self.sender = event.container.create_sender(self.sender_conn, self.addr, name=(self.addr + "_sender")) |
| |
| def on_link_opening(self, event): |
| if event.sender: |
| self.alt_sender = event.sender |
| event.sender.source.address = self.addr |
| event.sender.open() |
| |
| elif event.receiver: |
| self.alt_receiver = event.receiver |
| event.receiver.target.address = self.addr |
| event.receiver.open() |
| |
| def on_link_opened(self, event): |
| if event.sender == self.sender: |
| self.receiver_conn = event.container.connect(self.receiver_host) |
| |
| def send(self): |
| while self.sender.credit > 0 and self.n_tx < self.count: |
| self.sender.send(Message("Msg %s %d %d" % (self.addr, self.tx_seq, self.n_tx))) |
| self.n_tx += 1 |
| self.tx_seq += 1 |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| self.send() |
| |
| def on_message(self, event): |
| self.n_rx += 1 |
| if self.n_rx == self.count: |
| self.fail(None) |
| |
| def on_released(self, event): |
| self.n_rel += 1 |
| self.n_tx -= 1 |
| self.send() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class ReceiverFirstAutoLinkTest(MessagingHandler): |
| def __init__(self, sender_host, receiver_host): |
| super(ReceiverFirstAutoLinkTest, self).__init__() |
| self.sender_host = sender_host |
| self.receiver_host = receiver_host |
| self.addr = "dest.al" |
| self.count = 300 |
| |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.alt_receiver = None |
| self.error = None |
| self.n_tx = 0 |
| self.n_rx = 0 |
| self.n_rel = 0 |
| self.tx_seq = 0 |
| |
| def timeout(self): |
| self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d" % (self.n_tx, self.n_rx, self.n_rel) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def fail(self, error): |
| self.error = error |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.receiver_conn = event.container.connect(self.receiver_host) |
| |
| def on_link_opening(self, event): |
| if event.sender: |
| self.alt_sender = event.sender |
| event.sender.source.address = self.addr |
| event.sender.open() |
| |
| elif event.receiver: |
| self.alt_receiver = event.receiver |
| event.receiver.target.address = self.addr |
| event.receiver.open() |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.alt_receiver and not self.sender_conn: |
| self.sender_conn = event.container.connect(self.sender_host) |
| self.sender = event.container.create_sender(self.sender_conn, self.addr) |
| |
| def send(self): |
| while self.sender.credit > 0 and self.n_tx < self.count: |
| self.sender.send(Message("Msg %s %d %d" % (self.addr, self.tx_seq, self.n_tx))) |
| self.n_tx += 1 |
| self.tx_seq += 1 |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| self.send() |
| |
| def on_message(self, event): |
| self.n_rx += 1 |
| if self.n_rx == self.count: |
| self.fail(None) |
| |
| def on_released(self, event): |
| self.n_rel += 1 |
| self.n_tx -= 1 |
| self.send() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |