DISPATCH-1547: add tests for router table updates
diff --git a/tests/system_test.py b/tests/system_test.py
index 3857ae8..fa4b338 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -388,12 +388,14 @@
self.defaults()
return "".join(["%s {\n%s}\n"%(n, props(p, 1)) for n, p in self])
- def __init__(self, name=None, config=Config(), pyinclude=None, wait=True, perform_teardown=True, cl_args=[]):
+ def __init__(self, name=None, config=Config(), pyinclude=None, wait=True,
+ perform_teardown=True, cl_args=None, expect=Process.RUNNING):
"""
@param name: name used for for output files, default to id from config.
@param config: router configuration
@keyword wait: wait for router to be ready (call self.wait_ready())
"""
+ cl_args = cl_args or []
self.config = copy(config)
self.perform_teardown = perform_teardown
if not name: name = self.config.router_id
@@ -413,7 +415,7 @@
args += ['-I', os.path.join(env_home, 'python')]
args = os.environ.get('QPID_DISPATCH_RUNNER', '').split() + args
- super(Qdrouterd, self).__init__(args, name=name, expect=Process.RUNNING)
+ super(Qdrouterd, self).__init__(args, name=name, expect=expect)
self._management = None
self._wait_ready = False
if wait:
@@ -428,8 +430,10 @@
def teardown(self):
if self._management:
- try: self._management.close()
+ try:
+ self._management.close()
except: pass
+ self._management = None
if not self.perform_teardown:
return
@@ -631,7 +635,6 @@
if errors:
raise RuntimeError("Errors during teardown: \n\n%s" % "\n\n".join([str(e) for e in errors]))
-
def cleanup(self, x):
"""Record object x for clean-up during tear-down.
x should have on of the methods teardown, tearDown, stop or close"""
@@ -829,6 +832,7 @@
raise Exception("Timed out waiting for receiver start")
def _main(self):
+ self._container.timeout = 5.0
self._container.start()
while self._container.process():
if self._stop_thread:
@@ -870,6 +874,13 @@
def on_message(self, event):
self.queue.put(event.message)
+ def on_disconnected(self, event):
+ # if remote terminates the connection kill the thread else it will spin
+ # on the cpu
+ if self._conn:
+ self._conn.close()
+ self._conn = None
+
class AsyncTestSender(MessagingHandler):
"""
@@ -906,6 +917,7 @@
self._thread.start()
def _main(self):
+ self._container.timeout = 5.0
self._container.start()
while self._container.process():
self._check_if_done()
@@ -965,8 +977,17 @@
def on_link_error(self, event):
self.error = "link error:%s" % str(event.link.remote_condition)
- self._conn.close()
- self._conn = None
+ if self._conn:
+ self._conn.close()
+ self._conn = None
+
+ def on_disconnected(self, event):
+ # if remote terminates the connection kill the thread else it will spin
+ # on the cpu
+ self.error = "connection to remote dropped"
+ if self._conn:
+ self._conn.close()
+ self._conn = None
class QdManager(object):
diff --git a/tests/system_tests_topology.py b/tests/system_tests_topology.py
index 32f6459..e64553c 100644
--- a/tests/system_tests_topology.py
+++ b/tests/system_tests_topology.py
@@ -23,7 +23,9 @@
from __future__ import print_function
from proton import Message, Timeout
+from system_test import AsyncTestReceiver
from system_test import TestCase, Qdrouterd, main_module
+from system_test import TIMEOUT
from system_test import unittest
from proton.handlers import MessagingHandler
from proton.reactor import Container
@@ -592,5 +594,185 @@
Container(self).run()
+class RouterFluxTest(TestCase):
+ """
+ Verify route table addresses are flushed properly when a remote router is
+ rebooted or the link is determined to be stale.
+ """
+
+ def _create_router(self, name,
+ ra_interval=None,
+ ra_stale=None,
+ ra_flux=None,
+ extra=None):
+
+ config = [
+ ('router', {'id': name,
+ 'mode': 'interior',
+ # these are the default values from qdrouter.json
+ 'raIntervalSeconds': ra_interval or 30,
+ 'raIntervalFluxSeconds': ra_flux or 4,
+ 'remoteLsMaxAgeSeconds': ra_stale or 60}),
+ ('listener', {'role': 'normal',
+ 'port': self.tester.get_port()}),
+ ('address', {'prefix': 'closest',
+ 'distribution': 'closest'}),
+ ('address', {'prefix': 'multicast',
+ 'distribution': 'multicast'}),
+ ]
+
+ if extra:
+ config.extend(extra)
+ return self.tester.qdrouterd(name, Qdrouterd.Config(config),
+ wait=False, expect=None)
+
+ def _deploy_routers(self,
+ ra_interval=None,
+ ra_stale=None,
+ ra_flux=None):
+ # configuration:
+ # linear 3 interior routers
+ #
+ # +-------+ +-------+ +-------+
+ # | INT.A |<==>| INT.B |<==>| INT.C |
+ # +-------+ +-------+ +-------+
+ #
+ # INT.B has an inter-router listener, INT.A and INT.C connect in
+
+ i_r_port = self.tester.get_port()
+
+ INT_A = self._create_router('INT.A',
+ ra_interval,
+ ra_stale,
+ ra_flux,
+ extra=[('connector',
+ {'role': 'inter-router',
+ 'name': 'connectorToB',
+ 'port': i_r_port})])
+ INT_A.listener = INT_A.addresses[0]
+
+ INT_B = self._create_router('INT.B',
+ ra_interval,
+ ra_stale,
+ ra_flux,
+ extra=[('listener',
+ {'role': 'inter-router',
+ 'port': i_r_port})])
+ INT_B.inter_router_port = i_r_port
+
+ INT_C = self._create_router('INT.C',
+ ra_interval,
+ ra_stale,
+ ra_flux,
+ extra=[('connector',
+ {'role': 'inter-router',
+ 'name': 'connectorToB',
+ 'port': i_r_port})])
+ #
+ # wait until router network is formed
+ #
+ INT_B.wait_router_connected('INT.A')
+ INT_B.wait_router_connected('INT.C')
+
+ #
+ # create mobile addresses on INT_A
+ #
+ consumers = [
+ AsyncTestReceiver(INT_A.listener,
+ source='closest/on_A'),
+ AsyncTestReceiver(INT_A.listener,
+ source='closest/on_A')]
+ #
+ # wait for addresses to show up on INT.C
+ #
+ INT_C.wait_address('closest/on_A')
+
+ return (INT_A, INT_B, INT_C, consumers)
+
+ def test_01_reboot_INT_A(self):
+ """
+ When a router comes online after a reboot its route table sequence will
+ be different from the last update it sent. This should cause the local
+ router to flush all mobile addresses it learned from the remote router
+ before it rebooted.
+
+ Reboot INT.A and expect its mobile addresses are flushed on INT_C
+ """
+
+ # bump the remoteLsMaxAgeSeconds to longer than the test timeout so the
+ # test will timeout if the addresses are not removed before the link is
+ # considered stale
+ stale_timeout = int(TIMEOUT * 2)
+ INT_A, INT_B, INT_C, consumers = self._deploy_routers(ra_stale=stale_timeout)
+
+ # at this point all routers are running and the mobile addresses have
+ # propagated to INT_C. Now reboot INT_A
+ INT_A.teardown()
+
+ # stop consumers so INT_A's route table will be different when it comes
+ # back online so it will require an immediate sync
+ for c in consumers:
+ c.stop()
+
+ time.sleep(1.0)
+ INT_A = self._create_router('INT.A',
+ ra_stale=stale_timeout,
+ extra=[('connector',
+ {'role': 'inter-router',
+ 'name': 'connectorToB',
+ 'port':
+ INT_B.inter_router_port})])
+ INT_A.wait_router_connected('INT.B')
+
+ # expect: INT_A mobile addresses should be removed from INT_C
+ # immediately rather than waiting for the remoteLsMaxAgeSeconds timeout
+
+ mgmt = INT_C.management
+ a_type = 'org.apache.qpid.dispatch.router.address'
+ rsp = mgmt.query(a_type).get_dicts()
+ while any(map(lambda a: a['name'].find('closest/on_A') != -1, rsp)):
+ time.sleep(0.25)
+ rsp = mgmt.query(a_type).get_dicts()
+
+ def test_02_shutdown_INT_A(self):
+ """
+ When a neighboring router is no longer available, the routing algorithm
+ does not immediately remove the mobile addresses. Instead it waits
+ remoteLsMaxAgeSeconds to give the router time to come back. This allows
+ the route table to avoid costly updates should the network temporarily
+ bounce.
+
+ Delete INT.A and expect its mobile addresses are flushed on INT_C after
+ remoteLsMaxAgeSeconds
+ """
+
+ # shorten the RA intervals to speed up the test:
+ max_age = 6
+ INT_A, INT_B, INT_C, consumers = self._deploy_routers(ra_interval=2,
+ ra_stale=max_age,
+ ra_flux=1)
+
+ # at this point all routers are running and the mobile addresses have
+ # propagated to INT_C. Now remove INT_A
+ INT_A.teardown()
+ for c in consumers:
+ c.stop()
+
+ start = time.time()
+
+ # wait for INT_A mobile addresses to be removed from INT_C, this
+ # should happen after ra_stale seconds
+ mgmt = INT_C.management
+ a_type = 'org.apache.qpid.dispatch.router.address'
+ rsp = mgmt.query(a_type).get_dicts()
+ while any(map(lambda a: a['name'].find('closest/on_A') != -1, rsp)):
+ time.sleep(0.25)
+ rsp = mgmt.query(a_type).get_dicts()
+
+ # bit of a hack but ensure that the flush did not take an unreasonably
+ # long time with respect to the ra_stale value (3x is a guess btw)
+ self.assertTrue(time.time() - start <= (3.0 * max_age))
+
+
if __name__ == '__main__':
unittest.main(main_module())