blob: d7ff770fc7af8410cb2cf39d57150a6542cb454a [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import os
from threading import Timer
import json
import re
from system_test import main_module, TIMEOUT
from system_test import TestCase, Qdrouterd, Process, TIMEOUT
from system_test import unittest
from subprocess import PIPE, STDOUT
class FailoverTest(TestCase):
inter_router_port = None
@classmethod
def router(cls, name, config):
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
@classmethod
def setUpClass(cls):
super(FailoverTest, cls).setUpClass()
cls.routers = []
cls.inter_router_port = cls.tester.get_port()
cls.inter_router_port_1 = cls.tester.get_port()
cls.backup_port = cls.tester.get_port()
cls.backup_url = 'amqp://0.0.0.0:' + str(cls.backup_port)
cls.my_server_port = cls.tester.get_port()
cls.failover_list = 'amqp://third-host:5671, ' + cls.backup_url
#
# Router A tries to connect to Router B via its connectorToB. Router B responds with an open frame which will
# have the failover-server-list as one of its connection properties like the following -
# [0x13024d0]:0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767,
# idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY",
# properties={:product="qpid-dispatch-router", :version="1.0.0",
# :"failover-server-list"=[{:"network-host"="some-host", :port="35000"},
# {:"network-host"="0.0.0.0", :port="25000"}]}]
#
# The suite of tests determine if the router receiving this open frame stores it properly and if the
# original connection goes down, check that the router is trying to make connections to the failover urls.
#
FailoverTest.router('B', [
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': cls.inter_router_port,
'failoverUrls': cls.failover_list}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port()}),
]
)
FailoverTest.router('A',
[
('router', {'mode': 'interior', 'id': 'A'}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port()}),
('connector', {'name': 'connectorToB', 'role': 'inter-router',
'port': cls.inter_router_port}),
]
)
FailoverTest.router('C', [
('router', {'mode': 'interior', 'id': 'C'}),
('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': cls.backup_port}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port()}),
]
)
cls.routers[1].wait_router_connected('B')
def __init__(self, test_method):
TestCase.__init__(self, test_method)
self.success = False
self.timer_delay = 2
self.max_attempts = 10
self.attempts = 0
def address(self):
return self.routers[1].addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception("%s\n%s" % (e, out))
return out
def run_qdstat(self, args, regexp=None, address=None):
p = self.popen(
['qdstat', '--bus', str(address or self.router.addresses[0]), '--timeout', str(TIMEOUT)] + args,
name='qdstat-' + self.id(), stdout=PIPE, expect=None,
universal_newlines=True)
out = p.communicate()[0]
assert p.returncode == 0, \
"qdstat exit status %s, output:\n%s" % (p.returncode, out)
if regexp:
assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" % (regexp, out)
return out
def test_1_connector_has_failover_list(self):
"""
This is the most simple and straightforward case. Router A connects to Router B. Router B sends
failover information to Router A.
We make a qdmanage connector query to Router A which checks if Router A is storing the failover information
received from Router B.The failover list must consist of the original connection info (from the connector)
followed by the two items sent by the Router B (stored in cls.failover_list)
The 'failoverUrls' is comma separated.
"""
long_type = 'org.apache.qpid.dispatch.connector'
query_command = 'QUERY --type=' + long_type
output = json.loads(self.run_qdmanage(query_command))
expected = "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) + ", " + FailoverTest.failover_list
self.assertEqual(expected, output[0]['failoverUrls'])
def schedule_B_to_C_failover_test(self):
if self.attempts < self.max_attempts:
if not self.success:
Timer(self.timer_delay, self.check_C_connector).start()
self.attempts += 1
def check_C_connector(self):
long_type = 'org.apache.qpid.dispatch.connector'
query_command = 'QUERY --type=' + long_type
output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0]))
expected = FailoverTest.backup_url + ", " + "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) \
+ ", " + "amqp://third-host:5671"
if output[0].get('failoverUrls') == expected:
self.success = True
else:
self.schedule_B_to_C_failover_test()
def can_terminate(self):
if self.attempts == self.max_attempts:
return True
if self.success:
return True
return False
def test_2_remove_router_B(self):
"""
In this test, we are killing Router B. As a result, Router A should try to connect to Router C.
Router C does NOT have a failover list, so the open frame that Router C sends to Router A will not contain
the failover-server-list property..Hence the failoverUrls list will remain unchanged except that the order of
the URLs would be different.
"""
# First make sure there are no inter-router connections on router C
outs = self.run_qdstat(['--connections'], address=self.routers[2].addresses[1])
inter_router = 'inter-router' in outs
self.assertFalse(inter_router)
# Kill the router B
FailoverTest.routers[0].teardown()
# Schedule a test to make sure that the failover url is available
# and Router C has an inter-router connection
self.schedule_B_to_C_failover_test()
while not self.can_terminate():
pass
self.assertTrue(self.success)
def schedule_C_to_B_failover_test(self):
if self.attempts < self.max_attempts:
if not self.success:
Timer(self.timer_delay, self.check_B_connector).start()
self.attempts += 1
def check_B_connector(self):
# Router A should now try to connect to Router B again since we killed Router C.
long_type = 'org.apache.qpid.dispatch.connector'
query_command = 'QUERY --type=' + long_type
output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0]))
# The order that the URLs appear in the failoverUrls is important. This is the order in which the router
# will attempt to make connections in case the existing connection goes down.
expected = "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) + ", " + \
FailoverTest.failover_list + \
', amqp://127.0.0.1:%d' % FailoverTest.my_server_port
if output[0].get('failoverUrls') == expected:
self.success = True
else:
self.schedule_C_to_B_failover_test()
def test_3_reinstate_router_B(self):
"""
In this test, we are restarting Router B and killing Router C. Router A should now try to connect back to
Router B since it maintains the original connection info to Router B from the connector config information.
Before starting Router B back again, we
have a small config change to Router B wherein we are adding a new failover url to the original list.
This new failover url
points to our own server which will accept connections. This server will actually be used in the next test
but this test maskes sure that the new server url also shows up in the failoverUrls list.
"""
FailoverTest.router('B', [
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': FailoverTest.inter_router_port,
'failoverUrls': FailoverTest.failover_list + ', amqp://127.0.0.1:%d' % FailoverTest.my_server_port}),
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': FailoverTest.tester.get_port()}),
])
FailoverTest.routers[3].wait_ready()
# Kill the router C.
# Now since Router B is up and running, router A should try to re-connect to Router B.
# This will prove that the router A is preserving the original connector information specified in its config.
FailoverTest.routers[2].teardown()
self.success = False
self.attempts = 0
# Schedule a test to make sure that the failover url is available
self.schedule_C_to_B_failover_test()
while not self.can_terminate():
pass
self.assertTrue(self.success)
def check_A_connector(self):
# Router A should now try to connect to Router B again since we killed Router C.
long_type = 'org.apache.qpid.dispatch.connector'
query_command = 'QUERY --type=' + long_type
output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0]))
# The order that the URLs appear in the failoverUrls is important. This is the order in which the router
# will attempt to make connections in case the existing connection goes down.
expected = 'amqp://127.0.0.1:%d' % FailoverTest.my_server_port + ", " + "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port)
if output[0].get('failoverUrls') == expected:
self.success = True
else:
self.schedule_B_to_my_server_failover_test()
def schedule_B_to_my_server_failover_test(self):
if self.attempts < self.max_attempts:
if not self.success:
Timer(self.timer_delay, self.check_A_connector).start()
self.attempts += 1
def test_4_remove_router_B_connect_to_my_server(self):
"""
This test kills Router B again and makes sure that Router A now connects to our custom server that
accepts connections. This custom server intentionally sends an empty list for failover-server-list
Router A must look at this empty list and wipe out all failover information except the original connector information
and the current connection info.
"""
# Start MyServer
proc = FailoverTest.tester.popen(
['/usr/bin/env', '${PY_STRING}', os.path.join(os.path.dirname(os.path.abspath(__file__)), 'failoverserver.py'), '-a',
'amqp://127.0.0.1:%d' % FailoverTest.my_server_port], expect=Process.RUNNING)
# Kill the router B again
FailoverTest.routers[3].teardown()
self.success = False
self.attempts = 0
self.schedule_B_to_my_server_failover_test()
while not self.can_terminate():
pass
self.assertTrue(self.success)
if __name__ == '__main__':
unittest.main(main_module())