DISPATCH-1547: add tests for router table updates
diff --git a/tests/system_test.py b/tests/system_test.py
index 3857ae8..fa4b338 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -388,12 +388,14 @@
             self.defaults()
             return "".join(["%s {\n%s}\n"%(n, props(p, 1)) for n, p in self])
 
-    def __init__(self, name=None, config=Config(), pyinclude=None, wait=True, perform_teardown=True, cl_args=[]):
+    def __init__(self, name=None, config=Config(), pyinclude=None, wait=True,
+                 perform_teardown=True, cl_args=None, expect=Process.RUNNING):
         """
         @param name: name used for for output files, default to id from config.
         @param config: router configuration
         @keyword wait: wait for router to be ready (call self.wait_ready())
         """
+        cl_args = cl_args or []
         self.config = copy(config)
         self.perform_teardown = perform_teardown
         if not name: name = self.config.router_id
@@ -413,7 +415,7 @@
             args += ['-I', os.path.join(env_home, 'python')]
 
         args = os.environ.get('QPID_DISPATCH_RUNNER', '').split() + args
-        super(Qdrouterd, self).__init__(args, name=name, expect=Process.RUNNING)
+        super(Qdrouterd, self).__init__(args, name=name, expect=expect)
         self._management = None
         self._wait_ready = False
         if wait:
@@ -428,8 +430,10 @@
 
     def teardown(self):
         if self._management:
-            try: self._management.close()
+            try:
+                self._management.close()
             except: pass
+            self._management = None
 
         if not self.perform_teardown:
             return
@@ -631,7 +635,6 @@
         if errors:
             raise RuntimeError("Errors during teardown: \n\n%s" % "\n\n".join([str(e) for e in errors]))
 
-
     def cleanup(self, x):
         """Record object x for clean-up during tear-down.
         x should have on of the methods teardown, tearDown, stop or close"""
@@ -829,6 +832,7 @@
             raise Exception("Timed out waiting for receiver start")
 
     def _main(self):
+        self._container.timeout = 5.0
         self._container.start()
         while self._container.process():
             if self._stop_thread:
@@ -870,6 +874,13 @@
     def on_message(self, event):
         self.queue.put(event.message)
 
+    def on_disconnected(self, event):
+        # if remote terminates the connection kill the thread else it will spin
+        # on the cpu
+        if self._conn:
+            self._conn.close()
+            self._conn = None
+
 
 class AsyncTestSender(MessagingHandler):
     """
@@ -906,6 +917,7 @@
         self._thread.start()
 
     def _main(self):
+        self._container.timeout = 5.0
         self._container.start()
         while self._container.process():
             self._check_if_done()
@@ -965,8 +977,17 @@
 
     def on_link_error(self, event):
         self.error = "link error:%s" % str(event.link.remote_condition)
-        self._conn.close()
-        self._conn = None
+        if self._conn:
+            self._conn.close()
+            self._conn = None
+
+    def on_disconnected(self, event):
+        # if remote terminates the connection kill the thread else it will spin
+        # on the cpu
+        self.error = "connection to remote dropped"
+        if self._conn:
+            self._conn.close()
+            self._conn = None
 
 
 class QdManager(object):
diff --git a/tests/system_tests_topology.py b/tests/system_tests_topology.py
index 32f6459..e64553c 100644
--- a/tests/system_tests_topology.py
+++ b/tests/system_tests_topology.py
@@ -23,7 +23,9 @@
 from __future__ import print_function
 
 from proton          import Message, Timeout
+from system_test import AsyncTestReceiver
 from system_test     import TestCase, Qdrouterd, main_module
+from system_test import TIMEOUT
 from system_test import unittest
 from proton.handlers import MessagingHandler
 from proton.reactor  import Container
@@ -592,5 +594,185 @@
         Container(self).run()
 
 
+class RouterFluxTest(TestCase):
+    """
+    Verify route table addresses are flushed properly when a remote router is
+    rebooted or the link is determined to be stale.
+    """
+
+    def _create_router(self, name,
+                       ra_interval=None,
+                       ra_stale=None,
+                       ra_flux=None,
+                       extra=None):
+
+        config = [
+            ('router', {'id': name,
+                        'mode': 'interior',
+                        # these are the default values from qdrouter.json
+                        'raIntervalSeconds': ra_interval or 30,
+                        'raIntervalFluxSeconds': ra_flux or 4,
+                        'remoteLsMaxAgeSeconds': ra_stale or 60}),
+            ('listener', {'role': 'normal',
+                          'port': self.tester.get_port()}),
+            ('address', {'prefix': 'closest',
+                         'distribution': 'closest'}),
+            ('address', {'prefix': 'multicast',
+                         'distribution': 'multicast'}),
+        ]
+
+        if extra:
+            config.extend(extra)
+        return self.tester.qdrouterd(name, Qdrouterd.Config(config),
+                                     wait=False, expect=None)
+
+    def _deploy_routers(self,
+                        ra_interval=None,
+                        ra_stale=None,
+                        ra_flux=None):
+        # configuration:
+        # linear 3 interior routers
+        #
+        #  +-------+    +-------+    +-------+
+        #  | INT.A |<==>| INT.B |<==>| INT.C |
+        #  +-------+    +-------+    +-------+
+        #
+        # INT.B has an inter-router listener, INT.A and INT.C connect in
+
+        i_r_port = self.tester.get_port()
+
+        INT_A = self._create_router('INT.A',
+                                    ra_interval,
+                                    ra_stale,
+                                    ra_flux,
+                                    extra=[('connector',
+                                            {'role': 'inter-router',
+                                             'name': 'connectorToB',
+                                             'port': i_r_port})])
+        INT_A.listener = INT_A.addresses[0]
+
+        INT_B = self._create_router('INT.B',
+                                    ra_interval,
+                                    ra_stale,
+                                    ra_flux,
+                                    extra=[('listener',
+                                            {'role': 'inter-router',
+                                             'port': i_r_port})])
+        INT_B.inter_router_port = i_r_port
+
+        INT_C = self._create_router('INT.C',
+                                    ra_interval,
+                                    ra_stale,
+                                    ra_flux,
+                                    extra=[('connector',
+                                            {'role': 'inter-router',
+                                             'name': 'connectorToB',
+                                             'port': i_r_port})])
+        #
+        # wait until router network is formed
+        #
+        INT_B.wait_router_connected('INT.A')
+        INT_B.wait_router_connected('INT.C')
+
+        #
+        # create mobile addresses on INT_A
+        #
+        consumers = [
+            AsyncTestReceiver(INT_A.listener,
+                              source='closest/on_A'),
+            AsyncTestReceiver(INT_A.listener,
+                              source='closest/on_A')]
+        #
+        # wait for addresses to show up on INT.C
+        #
+        INT_C.wait_address('closest/on_A')
+
+        return (INT_A, INT_B, INT_C, consumers)
+
+    def test_01_reboot_INT_A(self):
+        """
+        When a router comes online after a reboot its route table sequence will
+        be different from the last update it sent.  This should cause the local
+        router to flush all mobile addresses it learned from the remote router
+        before it rebooted.
+
+        Reboot INT.A and expect its mobile addresses are flushed on INT_C
+        """
+
+        # bump the remoteLsMaxAgeSeconds to longer than the test timeout so the
+        # test will timeout if the addresses are not removed before the link is
+        # considered stale
+        stale_timeout = int(TIMEOUT * 2)
+        INT_A, INT_B, INT_C, consumers = self._deploy_routers(ra_stale=stale_timeout)
+
+        # at this point all routers are running and the mobile addresses have
+        # propagated to INT_C.  Now reboot INT_A
+        INT_A.teardown()
+
+        # stop consumers so INT_A's route table will be different when it comes
+        # back online so it will require an immediate sync
+        for c in consumers:
+            c.stop()
+
+        time.sleep(1.0)
+        INT_A = self._create_router('INT.A',
+                                    ra_stale=stale_timeout,
+                                    extra=[('connector',
+                                            {'role': 'inter-router',
+                                             'name': 'connectorToB',
+                                             'port':
+                                             INT_B.inter_router_port})])
+        INT_A.wait_router_connected('INT.B')
+
+        # expect: INT_A mobile addresses should be removed from INT_C
+        # immediately rather than waiting for the remoteLsMaxAgeSeconds timeout
+
+        mgmt = INT_C.management
+        a_type = 'org.apache.qpid.dispatch.router.address'
+        rsp = mgmt.query(a_type).get_dicts()
+        while any(map(lambda a: a['name'].find('closest/on_A') != -1, rsp)):
+            time.sleep(0.25)
+            rsp = mgmt.query(a_type).get_dicts()
+
+    def test_02_shutdown_INT_A(self):
+        """
+        When a neighboring router is no longer available, the routing algorithm
+        does not immediately remove the mobile addresses.  Instead it waits
+        remoteLsMaxAgeSeconds to give the router time to come back.  This allows
+        the route table to avoid costly updates should the network temporarily
+        bounce.
+
+        Delete INT.A and expect its mobile addresses are flushed on INT_C after
+        remoteLsMaxAgeSeconds
+        """
+
+        # shorten the RA intervals to speed up the test:
+        max_age = 6
+        INT_A, INT_B, INT_C, consumers = self._deploy_routers(ra_interval=2,
+                                                              ra_stale=max_age,
+                                                              ra_flux=1)
+
+        # at this point all routers are running and the mobile addresses have
+        # propagated to INT_C.  Now remove INT_A
+        INT_A.teardown()
+        for c in consumers:
+            c.stop()
+
+        start = time.time()
+
+        # wait for INT_A mobile addresses to be removed from INT_C, this
+        # should happen after ra_stale seconds
+        mgmt = INT_C.management
+        a_type = 'org.apache.qpid.dispatch.router.address'
+        rsp = mgmt.query(a_type).get_dicts()
+        while any(map(lambda a: a['name'].find('closest/on_A') != -1, rsp)):
+            time.sleep(0.25)
+            rsp = mgmt.query(a_type).get_dicts()
+
+        # bit of a hack but ensure that the flush did not take an unreasonably
+        # long time with respect to the ra_stale value (3x is a guess btw)
+        self.assertTrue(time.time() - start <= (3.0 * max_age))
+
+
 if __name__ == '__main__':
     unittest.main(main_module())