| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import unicode_literals |
| from __future__ import division |
| from __future__ import absolute_import |
| from __future__ import print_function |
| |
| from proton import Message, Timeout |
| from system_test import AsyncTestReceiver |
| from system_test import TestCase, Qdrouterd, main_module |
| from system_test import TIMEOUT |
| from system_test import unittest |
| from proton.handlers import MessagingHandler |
| from proton.reactor import Container |
| |
| import time |
| |
| #------------------------------------------------ |
| # Helper classes for all tests. |
| #------------------------------------------------ |
| |
| class Timeout(object): |
| """ |
| Named timeout object can handle multiple simultaneous |
| timers, by telling the parent which one fired. |
| """ |
| def __init__ ( self, parent, name ): |
| self.parent = parent |
| self.name = name |
| |
| def on_timer_task ( self, event ): |
| self.parent.timeout ( self.name ) |
| |
| |
| |
| class ManagementMessageHelper ( object ): |
| """ |
| Format management messages. |
| """ |
| def __init__ ( self, reply_addr ): |
| self.reply_addr = reply_addr |
| |
| def make_connector_query ( self, connector_name ): |
| props = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name } |
| msg = Message ( properties=props, reply_to=self.reply_addr ) |
| return msg |
| |
| def make_connector_delete_command ( self, connector_name ): |
| props = {'operation': 'DELETE', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name } |
| msg = Message ( properties=props, reply_to=self.reply_addr ) |
| return msg |
| |
| |
| #------------------------------------------------ |
| # END Helper classes for all tests. |
| #------------------------------------------------ |
| |
| |
| |
| |
| |
| #================================================================ |
| # Setup |
| #================================================================ |
| |
| class TopologyTests ( TestCase ): |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TopologyTests, cls).setUpClass() |
| |
| |
| |
| def router(name, more_config): |
| |
| config = [ ('router', {'mode': 'interior', 'id': name}), |
| ('address', {'prefix': 'closest', 'distribution': 'closest'}), |
| ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}) |
| ] \ |
| + more_config |
| |
| config = Qdrouterd.Config(config) |
| |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) |
| |
| cls.routers = [] |
| |
| A_client_port = cls.tester.get_port() |
| B_client_port = cls.tester.get_port() |
| C_client_port = cls.tester.get_port() |
| D_client_port = cls.tester.get_port() |
| |
| A_inter_router_port = cls.tester.get_port() |
| B_inter_router_port = cls.tester.get_port() |
| C_inter_router_port = cls.tester.get_port() |
| |
| # |
| # |
| # Topology of the 4-mesh, with costs of connections marked. |
| # Tail of arrow indicates initiator of connection. |
| # |
| # 1 |
| # D ----------> A |
| # | \ > ^ |
| # | 20\ 50/ | |
| # | \ / | |
| # 1 | / \ | 100 |
| # | / \ | |
| # v / > | |
| # C ----------> B |
| # 1 |
| # |
| # Test 1 TopologyFailover Notes |
| # |
| # 1. Messages are always sent from A, and go to B. |
| # 2. First route ahould be ADCB. |
| # 3. Then we kill connector CD. |
| # 4. Next route should be ADB. |
| # 5. Then we kill connector BD. |
| # 6. Next route should be ACB. |
| # 7. Then we kill connector BC. |
| # 8. Final route should be AB. |
| |
| cls.A_B_cost = 100 |
| cls.A_C_cost = 50 |
| cls.A_D_cost = 1 |
| cls.B_C_cost = 1 |
| cls.B_D_cost = 20 |
| cls.C_D_cost = 1 |
| |
| router ( 'A', |
| [ |
| ( 'listener', |
| { 'port': A_client_port, |
| 'role': 'normal', |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'listener', |
| { 'role': 'inter-router', |
| 'port': A_inter_router_port, |
| 'stripAnnotations': 'no' |
| } |
| ) |
| ] |
| ) |
| |
| |
| router ( 'B', |
| [ |
| ( 'listener', |
| { 'port': B_client_port, |
| 'role': 'normal', |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'listener', |
| { 'role': 'inter-router', |
| 'port': B_inter_router_port, |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'connector', |
| { 'name': 'AB_connector', |
| 'role': 'inter-router', |
| 'port': A_inter_router_port, |
| 'cost': cls.A_B_cost, |
| 'stripAnnotations': 'no' |
| } |
| ) |
| ] |
| ) |
| |
| |
| router ( 'C', |
| [ |
| ( 'listener', |
| { 'port': C_client_port, |
| 'role': 'normal', |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'listener', |
| { 'role': 'inter-router', |
| 'port': C_inter_router_port, |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'connector', |
| { 'name': 'AC_connector', |
| 'role': 'inter-router', |
| 'port': A_inter_router_port, |
| 'cost' : cls.A_C_cost, |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'connector', |
| { 'name': 'BC_connector', |
| 'role': 'inter-router', |
| 'port': B_inter_router_port, |
| 'cost' : cls.B_C_cost, |
| 'stripAnnotations': 'no' |
| } |
| ) |
| ] |
| ) |
| |
| |
| router ( 'D', |
| [ |
| ( 'listener', |
| { 'port': D_client_port, |
| 'role': 'normal', |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'connector', |
| { 'name': 'AD_connector', |
| 'role': 'inter-router', |
| 'port': A_inter_router_port, |
| 'cost' : cls.A_D_cost, |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'connector', |
| { 'name': 'BD_connector', |
| 'role': 'inter-router', |
| 'port': B_inter_router_port, |
| 'cost' : cls.B_D_cost, |
| 'stripAnnotations': 'no' |
| } |
| ), |
| ( 'connector', |
| { 'name': 'CD_connector', |
| 'role': 'inter-router', |
| 'port': C_inter_router_port, |
| 'cost' : cls.C_D_cost, |
| 'stripAnnotations': 'no' |
| } |
| ) |
| ] |
| ) |
| |
| |
| router_A = cls.routers[0] |
| router_B = cls.routers[1] |
| router_C = cls.routers[2] |
| router_D = cls.routers[3] |
| |
| router_A.wait_router_connected('B') |
| router_A.wait_router_connected('C') |
| router_A.wait_router_connected('D') |
| |
| cls.client_addrs = ( router_A.addresses[0], |
| router_B.addresses[0], |
| router_C.addresses[0], |
| router_D.addresses[0] |
| ) |
| |
| # 1 means skip that test. |
| cls.skip = { 'test_01' : 0 |
| } |
| |
| |
| |
| def test_01_topology_failover ( self ): |
| name = 'test_01' |
| if self.skip [ name ] : |
| self.skipTest ( "Test skipped during development." ) |
| test = TopologyFailover ( name, |
| self.client_addrs, |
| "closest/01" |
| ) |
| test.run() |
| self.assertEqual ( None, test.error ) |
| |
| |
| |
| #================================================================ |
| # Tests |
| #================================================================ |
| |
| |
| # Also see 'Test 1 TopologyFailover Notes', above. |
| |
| class TopologyFailover ( MessagingHandler ): |
| """ |
| Test that the lowest-cost route is always chosen in a 4-mesh |
| network topology, as one link after another is lost. |
| |
| This test also ensures that connections that have been |
| deliberately severed do no get restored. |
| """ |
| def __init__ ( self, test_name, client_addrs, destination ): |
| super(TopologyFailover, self).__init__(prefetch=0) |
| self.client_addrs = client_addrs |
| self.dest = destination |
| self.error = None |
| self.sender = None |
| self.receiver = None |
| self.test_timer = None |
| self.send_timer = None |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.n_accepted = 0 |
| self.n_released = 0 |
| self.reactor = None |
| self.state = None |
| self.send_conn = None |
| self.recv_conn = None |
| self.nap_time = 2 |
| self.debug = False |
| |
| # Holds the management sender, receiver, and 'helper' |
| # associated with each router. |
| self.routers = { |
| 'A' : dict(), |
| 'B' : dict(), |
| 'C' : dict(), |
| 'D' : dict() |
| } |
| |
| # These are the expectes routing traces, in the order we |
| # expect to receive them. |
| self.expected_traces = [ |
| [u'0/A', u'0/D', u'0/C', u'0/B'], |
| [u'0/A', u'0/D', u'0/B'], |
| [u'0/A', u'0/C', u'0/B'], |
| [u'0/A', u'0/B'] |
| ] |
| self.trace_count = 0 |
| |
| # This tells the system in what order to kill the connectors. |
| self.kill_list = ( |
| ( 'D', 'CD_connector' ), |
| ( 'D', 'BD_connector' ), |
| ( 'C', 'BC_connector' ) |
| ) |
| |
| # Use this to keep track of which connectors we have found |
| # when the test is first getting started and we are checking |
| # the topology. |
| self.connectors_map = { 'AB_connector' : 0, |
| 'AC_connector' : 0, |
| 'AD_connector' : 0, |
| 'BC_connector' : 0, |
| 'BD_connector' : 0, |
| 'CD_connector' : 0 |
| } |
| |
| |
| # The simple state machine transitions when certain events happen, |
| # if certain conditions are met. The conditions are checked for |
| # by the callbacks for the events. |
| # The normal sequence of states in the state machine is: |
| # 1. starting -- doesn't do anything |
| # 2. checking -- checks initial topology |
| # 3. examine_trace -- look at routing trace of first message |
| # 4. kill_connector -- kills the first connector (CD) |
| # 5. examine_trace -- checks routing trace of next message |
| # 5. kill_connector -- kills the next connector (BD) |
| # 5. examine_trace -- checks routing trace of next message |
| # 5. kill_connector -- kills the next connector (BC) |
| # 5. examine_trace -- checks routing trace of final message |
| # 5. bailing -- bails out with success |
| |
| |
| def state_transition ( self, message, new_state ) : |
| if self.state == new_state : |
| return |
| self.state = new_state |
| self.debug_print ( "state transition to : %s -- because %s" % ( self.state, message ) ) |
| |
| |
| def debug_print ( self, text ) : |
| if self.debug == True: |
| print("%s %s" % (time.time(), text)) |
| |
| |
| # Shut down everything and exit. |
| def bail ( self, text ): |
| self.error = text |
| |
| self.send_conn.close ( ) |
| self.recv_conn.close ( ) |
| |
| self.routers['B'] ['mgmt_conn'].close() |
| self.routers['C'] ['mgmt_conn'].close() |
| self.routers['D'] ['mgmt_conn'].close() |
| |
| self.test_timer.cancel ( ) |
| self.send_timer.cancel ( ) |
| |
| |
| #------------------------------------------------------------------------ |
| # I want some behavior from this test that is a little too complex |
| # to be governed by the usual callback functions. The way I do this |
| # is by making a simple state machine that checks some conditions |
| # during some callback, and then either steps forward or terminates |
| # the test. |
| # The callbacks that activate the state machine are mostly on_message, |
| # or timeout. But there are two different timers: the one-second |
| # timer that mostly runs the test, and the 60-second timer that, if it |
| # fires, will terminate the test with a timeout error. |
| #------------------------------------------------------------------------ |
| def timeout ( self, name ): |
| if name == 'test': |
| self.set_state ( 'Timeout Expired', 'bailing' ) |
| self.bail ( "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \ |
| ( self.n_sent, self.n_received, self.n_accepted ) ) |
| elif name == 'sender': |
| if self.state == 'examine_trace' : |
| self.send ( ) |
| self.send_timer = self.reactor.schedule(1, Timeout(self, "sender")) |
| |
| |
| def on_start ( self, event ): |
| self.state_transition ( 'on_start', 'starting' ) |
| self.reactor = event.reactor |
| self.test_timer = event.reactor.schedule ( TIMEOUT, Timeout(self, "test") ) |
| self.send_timer = event.reactor.schedule ( 1, Timeout(self, "sender") ) |
| self.send_conn = event.container.connect ( self.client_addrs[0] ) # A |
| self.recv_conn = event.container.connect ( self.client_addrs[1] ) # B |
| |
| self.sender = event.container.create_sender ( self.send_conn, self.dest ) |
| self.receiver = event.container.create_receiver ( self.recv_conn, self.dest ) |
| self.receiver.flow ( 100 ) |
| |
| # I will only send management messages to B, C, and D, because |
| # they are the owners of the connections that I will want to delete. |
| self.routers['B'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[1] ) |
| self.routers['C'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[2] ) |
| self.routers['D'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[3] ) |
| |
| self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['B'] ['mgmt_conn'], dynamic=True ) |
| self.routers['C'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['C'] ['mgmt_conn'], dynamic=True ) |
| self.routers['D'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['D'] ['mgmt_conn'], dynamic=True ) |
| |
| self.routers['B'] ['mgmt_sender'] = event.container.create_sender ( self.routers['B'] ['mgmt_conn'], "$management" ) |
| self.routers['C'] ['mgmt_sender'] = event.container.create_sender ( self.routers['C'] ['mgmt_conn'], "$management" ) |
| self.routers['D'] ['mgmt_sender'] = event.container.create_sender ( self.routers['D'] ['mgmt_conn'], "$management" ) |
| |
| |
| |
| #----------------------------------------------------------------- |
| # At start-time, as the links to the three managed routers |
| # open, check each one to make sure that it has all the expected |
| # connections. |
| #----------------------------------------------------------------- |
| def on_link_opened ( self, event ) : |
| self.state_transition ( 'on_link_opened', 'checking' ) |
| # The B mgmt link has opened. Check its connections. -------------------------- |
| if event.receiver == self.routers['B'] ['mgmt_receiver'] : |
| event.receiver.flow ( 1000 ) |
| self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address ) |
| for connector in [ 'AB_connector' ] : |
| self.connector_check ( 'B', connector ) |
| # The C mgmt link has opened. Check its connections. -------------------------- |
| elif event.receiver == self.routers['C'] ['mgmt_receiver'] : |
| event.receiver.flow ( 1000 ) |
| self.routers['C'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address ) |
| for connector in [ 'AC_connector', 'BC_connector' ] : |
| self.connector_check ( 'C', connector ) |
| # The D mgmt link has opened. Check its connections. -------------------------- |
| elif event.receiver == self.routers['D'] ['mgmt_receiver']: |
| event.receiver.flow ( 1000 ) |
| self.routers['D'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address ) |
| for connector in [ 'AD_connector', 'BD_connector', 'CD_connector' ] : |
| self.connector_check ( 'D', connector ) |
| |
| |
| def send ( self ): |
| n_sent_this_time = 0 |
| if self.sender.credit <= 0: |
| self.receiver.flow ( 100 ) |
| return |
| # Send messages one at a time. |
| if self.sender.credit > 0 : |
| msg = Message ( body=self.n_sent ) |
| self.sender.send ( msg ) |
| n_sent_this_time += 1 |
| self.n_sent += 1 |
| self.debug_print ( "sent: %d" % self.n_sent ) |
| |
| |
| def on_message ( self, event ): |
| |
| if event.receiver == self.routers['B'] ['mgmt_receiver'] or \ |
| event.receiver == self.routers['C'] ['mgmt_receiver'] or \ |
| event.receiver == self.routers['D'] ['mgmt_receiver'] : |
| |
| #---------------------------------------------------------------- |
| # This is a management message. |
| #---------------------------------------------------------------- |
| if self.state == 'checking' : |
| connection_name = event.message.body['name'] |
| |
| if connection_name in self.connectors_map : |
| self.connectors_map [ connection_name ] = 1 |
| else : |
| self.state_transition ( "bad connection name: %s" % connection_name, 'bailing' ) |
| self.bail ( "bad connection name: %s" % connection_name ) |
| |
| n_connections = sum(self.connectors_map.values()) |
| if n_connections == 6 : |
| self.state_transition ( "all %d connections found" % n_connections, 'examine_trace' ) |
| elif self.state == 'kill_connector' : |
| if event.message.properties["statusDescription"] == 'No Content': |
| # We are in the process of killing a connector, and |
| # have received the response to the kill message. |
| self.state_transition ( 'got kill response', 'examine_trace' ) |
| # This sleep is here because one early bug that this test found |
| # (and which is now fixed) involved connections that had been |
| # deleted coming back sometimes. It was a race and only happened |
| # very occasionally -- but with a pause here, after getting |
| # confirmation that we have successfully deleted the connector, |
| # the bug would show up 60 to 75% of the time. I think that leaving |
| # this sleep here is the only way to ensure that that particular |
| # bug stays fixed. |
| time.sleep ( self.nap_time ) |
| else: |
| #---------------------------------------------------------------- |
| # This is a payload message. |
| #---------------------------------------------------------------- |
| self.n_received += 1 |
| if self.state == 'examine_trace' : |
| trace = event.message.annotations [ 'x-opt-qd.trace' ] |
| expected = self.expected_traces [ self.trace_count ] |
| if trace == expected : |
| if self.trace_count == len(self.expected_traces) - 1 : |
| self.state_transition ( 'final expected trace %s observed' % expected, 'bailing' ) |
| self.bail ( None ) |
| return |
| self.state_transition ( "expected trace %d observed successfully %s" % ( self.trace_count, expected ) , 'kill_connector' ) |
| self.kill_a_connector ( self.kill_list[self.trace_count] ) |
| self.trace_count += 1 |
| else : |
| self.state_transition ( "expected trace %s but got %s" % ( expected, trace ), 'bailing' ) |
| self.bail ( "expected trace %s but got %s" % ( expected, trace ) ) |
| |
| |
| def on_accepted ( self, event ): |
| self.n_accepted += 1 |
| |
| |
| def on_released ( self, event ) : |
| self.n_released += 1 |
| |
| |
| def connector_check ( self, router, connector ) : |
| self.debug_print ( "checking connector for router %s" % router ) |
| mgmt_helper = self.routers[router] ['mgmt_helper'] |
| mgmt_sender = self.routers[router] ['mgmt_sender'] |
| msg = mgmt_helper.make_connector_query ( connector ) |
| mgmt_sender.send ( msg ) |
| |
| |
| def kill_a_connector ( self, target ) : |
| router = target[0] |
| connector = target[1] |
| self.debug_print ( "killing connector %s on router %s" % (connector, router) ) |
| mgmt_helper = self.routers[router] ['mgmt_helper'] |
| mgmt_sender = self.routers[router] ['mgmt_sender'] |
| msg = mgmt_helper.make_connector_delete_command ( connector ) |
| mgmt_sender.send ( msg ) |
| |
| |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class RouterFluxTest(TestCase): |
| """ |
| Verify route table addresses are flushed properly when a remote router is |
| rebooted or the link is determined to be stale. |
| """ |
| |
| def _create_router(self, name, |
| ra_interval=None, |
| ra_stale=None, |
| ra_flux=None, |
| extra=None): |
| |
| config = [ |
| ('router', {'id': name, |
| 'mode': 'interior', |
| # these are the default values from qdrouter.json |
| 'raIntervalSeconds': ra_interval or 30, |
| 'raIntervalFluxSeconds': ra_flux or 4, |
| 'remoteLsMaxAgeSeconds': ra_stale or 60}), |
| ('listener', {'role': 'normal', |
| 'port': self.tester.get_port()}), |
| ('address', {'prefix': 'closest', |
| 'distribution': 'closest'}), |
| ('address', {'prefix': 'multicast', |
| 'distribution': 'multicast'}), |
| ] |
| |
| if extra: |
| config.extend(extra) |
| return self.tester.qdrouterd(name, Qdrouterd.Config(config), |
| wait=False, expect=None) |
| |
| def _deploy_routers(self, |
| ra_interval=None, |
| ra_stale=None, |
| ra_flux=None): |
| # configuration: |
| # linear 3 interior routers |
| # |
| # +-------+ +-------+ +-------+ |
| # | INT.A |<==>| INT.B |<==>| INT.C | |
| # +-------+ +-------+ +-------+ |
| # |
| # INT.B has an inter-router listener, INT.A and INT.C connect in |
| |
| i_r_port = self.tester.get_port() |
| |
| INT_A = self._create_router('INT.A', |
| ra_interval, |
| ra_stale, |
| ra_flux, |
| extra=[('connector', |
| {'role': 'inter-router', |
| 'name': 'connectorToB', |
| 'port': i_r_port})]) |
| INT_A.listener = INT_A.addresses[0] |
| |
| INT_B = self._create_router('INT.B', |
| ra_interval, |
| ra_stale, |
| ra_flux, |
| extra=[('listener', |
| {'role': 'inter-router', |
| 'port': i_r_port})]) |
| INT_B.inter_router_port = i_r_port |
| |
| INT_C = self._create_router('INT.C', |
| ra_interval, |
| ra_stale, |
| ra_flux, |
| extra=[('connector', |
| {'role': 'inter-router', |
| 'name': 'connectorToB', |
| 'port': i_r_port})]) |
| # |
| # wait until router network is formed |
| # |
| INT_B.wait_router_connected('INT.A') |
| INT_B.wait_router_connected('INT.C') |
| |
| # |
| # create mobile addresses on INT_A |
| # |
| consumers = [ |
| AsyncTestReceiver(INT_A.listener, |
| source='closest/on_A'), |
| AsyncTestReceiver(INT_A.listener, |
| source='closest/on_A')] |
| # |
| # wait for addresses to show up on INT.C |
| # |
| INT_C.wait_address('closest/on_A') |
| |
| return (INT_A, INT_B, INT_C, consumers) |
| |
| def test_01_reboot_INT_A(self): |
| """ |
| When a router comes online after a reboot its route table sequence will |
| be different from the last update it sent. This should cause the local |
| router to flush all mobile addresses it learned from the remote router |
| before it rebooted. |
| |
| Reboot INT.A and expect its mobile addresses are flushed on INT_C |
| """ |
| |
| # bump the remoteLsMaxAgeSeconds to longer than the test timeout so the |
| # test will timeout if the addresses are not removed before the link is |
| # considered stale |
| stale_timeout = int(TIMEOUT * 2) |
| INT_A, INT_B, INT_C, consumers = self._deploy_routers(ra_stale=stale_timeout) |
| |
| # at this point all routers are running and the mobile addresses have |
| # propagated to INT_C. Now reboot INT_A |
| INT_A.teardown() |
| |
| # stop consumers so INT_A's route table will be different when it comes |
| # back online so it will require an immediate sync |
| for c in consumers: |
| c.stop() |
| |
| time.sleep(1.0) |
| INT_A = self._create_router('INT.A', |
| ra_stale=stale_timeout, |
| extra=[('connector', |
| {'role': 'inter-router', |
| 'name': 'connectorToB', |
| 'port': |
| INT_B.inter_router_port})]) |
| INT_A.wait_router_connected('INT.B') |
| |
| # expect: INT_A mobile addresses should be removed from INT_C |
| # immediately rather than waiting for the remoteLsMaxAgeSeconds timeout |
| |
| mgmt = INT_C.management |
| a_type = 'org.apache.qpid.dispatch.router.address' |
| rsp = mgmt.query(a_type).get_dicts() |
| while any(map(lambda a: a['name'].find('closest/on_A') != -1, rsp)): |
| time.sleep(0.25) |
| rsp = mgmt.query(a_type).get_dicts() |
| |
| def test_02_shutdown_INT_A(self): |
| """ |
| When a neighboring router is no longer available, the routing algorithm |
| does not immediately remove the mobile addresses. Instead it waits |
| remoteLsMaxAgeSeconds to give the router time to come back. This allows |
| the route table to avoid costly updates should the network temporarily |
| bounce. |
| |
| Delete INT.A and expect its mobile addresses are flushed on INT_C after |
| remoteLsMaxAgeSeconds |
| """ |
| |
| # shorten the RA intervals to speed up the test: |
| max_age = 6 |
| INT_A, INT_B, INT_C, consumers = self._deploy_routers(ra_interval=2, |
| ra_stale=max_age, |
| ra_flux=1) |
| |
| # at this point all routers are running and the mobile addresses have |
| # propagated to INT_C. Now remove INT_A |
| INT_A.teardown() |
| for c in consumers: |
| c.stop() |
| |
| start = time.time() |
| |
| # wait for INT_A mobile addresses to be removed from INT_C, this |
| # should happen after ra_stale seconds |
| mgmt = INT_C.management |
| a_type = 'org.apache.qpid.dispatch.router.address' |
| rsp = mgmt.query(a_type).get_dicts() |
| while any(map(lambda a: a['name'].find('closest/on_A') != -1, rsp)): |
| time.sleep(0.25) |
| rsp = mgmt.query(a_type).get_dicts() |
| |
| # bit of a hack but ensure that the flush did not take an unreasonably |
| # long time with respect to the ra_stale value (3x is a guess btw) |
| self.assertTrue(time.time() - start <= (3.0 * max_age)) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |