| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import unicode_literals |
| from __future__ import division |
| from __future__ import absolute_import |
| from __future__ import print_function |
| |
| import os |
| from threading import Timer |
| import json, re |
| from system_test import main_module, TIMEOUT |
| from system_test import TestCase, Qdrouterd, Process, TIMEOUT |
| from system_test import unittest |
| from subprocess import PIPE, STDOUT |
| |
| |
| class FailoverTest(TestCase): |
| inter_router_port = None |
| |
| @classmethod |
| def router(cls, name, config): |
| config = Qdrouterd.Config(config) |
| |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) |
| |
| @classmethod |
| def setUpClass(cls): |
| super(FailoverTest, cls).setUpClass() |
| |
| cls.routers = [] |
| |
| cls.inter_router_port = cls.tester.get_port() |
| cls.inter_router_port_1 = cls.tester.get_port() |
| cls.backup_port = cls.tester.get_port() |
| cls.backup_url = 'amqp://0.0.0.0:' + str(cls.backup_port) |
| cls.my_server_port = cls.tester.get_port() |
| |
| cls.failover_list = 'amqp://third-host:5671, ' + cls.backup_url |
| |
| # |
| # Router A tries to connect to Router B via its connectorToB. Router B responds with an open frame which will |
| # have the failover-server-list as one of its connection properties like the following - |
| # [0x13024d0]:0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767, |
| # idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY", |
| # properties={:product="qpid-dispatch-router", :version="1.0.0", |
| # :"failover-server-list"=[{:"network-host"="some-host", :port="35000"}, |
| # {:"network-host"="0.0.0.0", :port="25000"}]}] |
| # |
| # The suite of tests determine if the router receiving this open frame stores it properly and if the |
| # original connection goes down, check that the router is trying to make connections to the failover urls. |
| # |
| FailoverTest.router('B', [ |
| ('router', {'mode': 'interior', 'id': 'B'}), |
| ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': cls.inter_router_port, |
| 'failoverUrls': cls.failover_list}), |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port()}), |
| ] |
| ) |
| |
| FailoverTest.router('A', |
| [ |
| ('router', {'mode': 'interior', 'id': 'A'}), |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port()}), |
| ('connector', {'name': 'connectorToB', 'role': 'inter-router', |
| 'port': cls.inter_router_port, 'verifyHostname': 'no'}), |
| ] |
| ) |
| |
| FailoverTest.router('C', [ |
| ('router', {'mode': 'interior', 'id': 'C'}), |
| ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': cls.backup_port}), |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port()}), |
| ] |
| ) |
| |
| cls.routers[1].wait_router_connected('B') |
| |
| def __init__(self, test_method): |
| TestCase.__init__(self, test_method) |
| self.success = False |
| self.timer_delay = 2 |
| self.max_attempts = 10 |
| self.attempts = 0 |
| |
| def address(self): |
| return self.routers[1].addresses[0] |
| |
| def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): |
| p = self.popen( |
| ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], |
| stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, |
| universal_newlines=True) |
| out = p.communicate(input)[0] |
| try: |
| p.teardown() |
| except Exception as e: |
| raise Exception("%s\n%s" % (e, out)) |
| return out |
| |
| def run_qdstat(self, args, regexp=None, address=None): |
| p = self.popen( |
| ['qdstat', '--bus', str(address or self.router.addresses[0]), '--timeout', str(TIMEOUT) ] + args, |
| name='qdstat-'+self.id(), stdout=PIPE, expect=None, |
| universal_newlines=True) |
| |
| out = p.communicate()[0] |
| assert p.returncode == 0, \ |
| "qdstat exit status %s, output:\n%s" % (p.returncode, out) |
| if regexp: assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" % (regexp, out) |
| return out |
| |
| def test_1_connector_has_failover_list(self): |
| """ |
| This is the most simple and straightforward case. Router A connects to Router B. Router B sends |
| failover information to Router A. |
| We make a qdmanage connector query to Router A which checks if Router A is storing the failover information |
| received from Router B.The failover list must consist of the original connection info (from the connector) |
| followed by the two items sent by the Router B (stored in cls.failover_list) |
| The 'failoverUrls' is comma separated. |
| """ |
| long_type = 'org.apache.qpid.dispatch.connector' |
| query_command = 'QUERY --type=' + long_type |
| output = json.loads(self.run_qdmanage(query_command)) |
| expected = "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) + ", " + FailoverTest.failover_list |
| |
| self.assertEqual(expected, output[0]['failoverUrls']) |
| |
| def schedule_B_to_C_failover_test(self): |
| if self.attempts < self.max_attempts: |
| if not self.success: |
| Timer(self.timer_delay, self.check_C_connector).start() |
| self.attempts += 1 |
| |
| def check_C_connector(self): |
| long_type = 'org.apache.qpid.dispatch.connector' |
| query_command = 'QUERY --type=' + long_type |
| output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0])) |
| |
| expected = FailoverTest.backup_url + ", " + "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) \ |
| + ", " + "amqp://third-host:5671" |
| |
| if output[0].get('failoverUrls') == expected: |
| self.success = True |
| else: |
| self.schedule_B_to_C_failover_test() |
| |
| def can_terminate(self): |
| if self.attempts == self.max_attempts: |
| return True |
| |
| if self.success: |
| return True |
| |
| return False |
| |
| def test_2_remove_router_B(self): |
| """ |
| In this test, we are killing Router B. As a result, Router A should try to connect to Router C. |
| Router C does NOT have a failover list, so the open frame that Router C sends to Router A will not contain |
| the failover-server-list property..Hence the failoverUrls list will remain unchanged except that the order of |
| the URLs would be different. |
| """ |
| |
| # First make sure there are no inter-router connections on router C |
| outs = self.run_qdstat(['--connections'], address=self.routers[2].addresses[1]) |
| |
| inter_router = 'inter-router' in outs |
| self.assertFalse(inter_router) |
| |
| # Kill the router B |
| FailoverTest.routers[0].teardown() |
| |
| # Schedule a test to make sure that the failover url is available |
| # and Router C has an inter-router connection |
| self.schedule_B_to_C_failover_test() |
| |
| while not self.can_terminate(): |
| pass |
| |
| self.assertTrue(self.success) |
| |
| |
| def schedule_C_to_B_failover_test(self): |
| if self.attempts < self.max_attempts: |
| if not self.success: |
| Timer(self.timer_delay, self.check_B_connector).start() |
| self.attempts += 1 |
| |
| def check_B_connector(self): |
| # Router A should now try to connect to Router B again since we killed Router C. |
| long_type = 'org.apache.qpid.dispatch.connector' |
| query_command = 'QUERY --type=' + long_type |
| output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0])) |
| |
| # The order that the URLs appear in the failoverUrls is important. This is the order in which the router |
| # will attempt to make connections in case the existing connection goes down. |
| |
| expected = "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) + ", " + \ |
| FailoverTest.failover_list + \ |
| ', amqp://127.0.0.1:%d' % FailoverTest.my_server_port |
| |
| if output[0].get('failoverUrls') == expected: |
| self.success = True |
| else: |
| self.schedule_C_to_B_failover_test() |
| |
| def test_3_reinstate_router_B(self): |
| """ |
| In this test, we are restarting Router B and killing Router C. Router A should now try to connect back to |
| Router B since it maintains the original connection info to Router B from the connector config information. |
| Before starting Router B back again, we |
| have a small config change to Router B wherein we are adding a new failover url to the original list. |
| This new failover url |
| points to our own server which will accept connections. This server will actually be used in the next test |
| but this test maskes sure that the new server url also shows up in the failoverUrls list. |
| """ |
| FailoverTest.router('B', [ |
| ('router', {'mode': 'interior', 'id': 'B'}), |
| ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': FailoverTest.inter_router_port, |
| 'failoverUrls': FailoverTest.failover_list + ', amqp://127.0.0.1:%d' % FailoverTest.my_server_port}), |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': FailoverTest.tester.get_port()}), |
| ]) |
| |
| FailoverTest.routers[3].wait_ready() |
| |
| # Kill the router C. |
| # Now since Router B is up and running, router A should try to re-connect to Router B. |
| # This will prove that the router A is preserving the original connector information specified in its config. |
| FailoverTest.routers[2].teardown() |
| |
| self.success = False |
| self.attempts = 0 |
| |
| # Schedule a test to make sure that the failover url is available |
| self.schedule_C_to_B_failover_test() |
| |
| while not self.can_terminate(): |
| pass |
| |
| self.assertTrue(self.success) |
| |
| def check_A_connector(self): |
| # Router A should now try to connect to Router B again since we killed Router C. |
| long_type = 'org.apache.qpid.dispatch.connector' |
| query_command = 'QUERY --type=' + long_type |
| output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0])) |
| |
| # The order that the URLs appear in the failoverUrls is important. This is the order in which the router |
| # will attempt to make connections in case the existing connection goes down. |
| expected = 'amqp://127.0.0.1:%d' % FailoverTest.my_server_port + ", " + "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) |
| |
| if output[0].get('failoverUrls') == expected: |
| self.success = True |
| else: |
| self.schedule_B_to_my_server_failover_test() |
| |
| def schedule_B_to_my_server_failover_test(self): |
| if self.attempts < self.max_attempts: |
| if not self.success: |
| Timer(self.timer_delay, self.check_A_connector).start() |
| self.attempts += 1 |
| |
| def test_4_remove_router_B_connect_to_my_server(self): |
| """ |
| This test kills Router B again and makes sure that Router A now connects to our custom server that |
| accepts connections. This custom server intentionally sends an empty list for failover-server-list |
| Router A must look at this empty list and wipe out all failover information except the original connector information |
| and the current connection info. |
| """ |
| |
| |
| # Start MyServer |
| proc = FailoverTest.tester.popen( |
| ['/usr/bin/env', '${PY_STRING}', os.path.join(os.path.dirname(os.path.abspath(__file__)), 'failoverserver.py'), '-a', |
| 'amqp://127.0.0.1:%d' % FailoverTest.my_server_port], expect=Process.RUNNING) |
| |
| # Kill the router B again |
| FailoverTest.routers[3].teardown() |
| |
| self.success = False |
| self.attempts = 0 |
| |
| self.schedule_B_to_my_server_failover_test() |
| |
| while not self.can_terminate(): |
| pass |
| |
| self.assertTrue(self.success) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |