| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import unittest |
| from time import sleep |
| from subprocess import PIPE, STDOUT |
| |
| from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process |
| |
| from proton import Message, Endpoint |
| from proton.handlers import MessagingHandler |
| from proton.reactor import AtMostOnce, Container |
| from proton.utils import BlockingConnection, LinkDetached |
| |
| from qpid_dispatch.management.client import Node |
| |
| class LinkRoutePatternTest(TestCase): |
| """ |
| Tests the linkRoutePattern property of the dispatch router. |
| |
| Sets up 3 routers (one of which is acting as a broker(QDR.A)). 2 routers have linkRoutePattern set to 'org.apache.' |
| (please see configs in the setUpClass method to get a sense of how the routers and their connections are configured) |
| The tests in this class send and receive messages across this network of routers to link routable addresses. |
| Uses the Python Blocking API to send/receive messages. The blocking api plays neatly into the synchronous nature |
| of system tests. |
| |
| QDR.A acting broker |
| +---------+ +---------+ +---------+ +-----------------+ |
| | | <------ | | <----- | |<----| blocking_sender | |
| | QDR.A | | QDR.B | | QDR.C | +-----------------+ |
| | | ------> | | ------> | | +-------------------+ |
| +---------+ +---------+ +---------+---->| blocking_receiver | |
| +-------------------+ |
| """ |
| @classmethod |
| def get_router(cls, index): |
| return cls.routers[index] |
| |
| @classmethod |
| def setUpClass(cls): |
| """Start three routers""" |
| super(LinkRoutePatternTest, cls).setUpClass() |
| |
| def router(name, connection): |
| |
| config = [ |
| ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}), |
| ('fixedAddress', {'prefix': '/closest/', 'fanout': 'single', 'bias': 'closest'}), |
| ('fixedAddress', {'prefix': '/spread/', 'fanout': 'single', 'bias': 'spread'}), |
| ('fixedAddress', {'prefix': '/multicast/', 'fanout': 'multiple'}), |
| ('fixedAddress', {'prefix': '/', 'fanout': 'multiple'}), |
| |
| ] + connection |
| |
| config = Qdrouterd.Config(config) |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=False)) |
| |
| cls.routers = [] |
| a_listener_port = cls.tester.get_port() |
| b_listener_port = cls.tester.get_port() |
| c_listener_port = cls.tester.get_port() |
| test_tag_listener_port = cls.tester.get_port() |
| |
| router('A', |
| [ |
| ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| ]) |
| router('B', |
| [ |
| ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| ('listener', {'name': 'test-tag', 'role': 'route-container', 'host': '0.0.0.0', 'port': test_tag_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| |
| # This is an on-demand connection made from QDR.B's ephemeral port to a_listener_port |
| ('connector', {'name': 'broker', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| # Only inter router communication must happen on 'inter-router' connectors. This connector makes |
| # a connection from the router B's ephemeral port to c_listener_port |
| ('connector', {'role': 'inter-router', 'host': '0.0.0.0', 'port': c_listener_port}), |
| ('linkRoutePattern', {'prefix': 'org.apache', 'connector': 'broker'}), |
| ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'dir': 'in'}), |
| ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'dir': 'out'}) |
| ] |
| ) |
| router('C', |
| [ |
| ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': c_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| # The client will exclusively use the following listener to connect to QDR.C |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}), |
| # Note here that the linkRoutePattern is set to org.apache. which makes it backward compatible. |
| # The dot(.) at the end is ignored by the address hashing scheme. |
| ('linkRoutePattern', {'prefix': 'org.apache.'}), |
| ('linkRoute', {'prefix': 'pulp.task', 'dir': 'in'}), |
| ('linkRoute', {'prefix': 'pulp.task', 'dir': 'out'}) |
| ] |
| ) |
| |
| # Wait for the routers to locate each other |
| cls.routers[1].wait_router_connected('QDR.C') |
| cls.routers[2].wait_router_connected('QDR.B') |
| |
| # This is not a classic router network in the sense that one router is acting as a broker. We allow a little |
| # bit more time for the routers to stabilize. |
| sleep(2) |
| |
| def run_qdstat_linkRoute(self, address): |
| p = self.popen( |
| ['qdstat', '--bus', str(address), '--timeout', str(TIMEOUT) ] + ['--linkroute'], |
| name='qdstat-'+self.id(), stdout=PIPE, expect=None) |
| |
| out = p.communicate()[0] |
| assert p.returncode == 0, "qdstat exit status %s, output:\n%s" % (p.returncode, out) |
| return out |
| |
| def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): |
| p = self.popen( |
| ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], |
| stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect) |
| out = p.communicate(input)[0] |
| try: |
| p.teardown() |
| except Exception, e: |
| raise Exception("%s\n%s" % (e, out)) |
| return out |
| |
| def test_aaa_qdmanage_query_link_route(self): |
| """ |
| qdmanage converts short type to long type and this test specifically tests if qdmanage is actually doing |
| the type conversion correctly by querying with short type and long type. |
| """ |
| cmd = 'QUERY --type=linkRoute' |
| out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) |
| |
| # Make sure there is a dir of in and out. |
| self.assertTrue('"dir": "in"' in out) |
| self.assertTrue('"dir": "out"' in out) |
| self.assertTrue('"connection": "broker"' in out) |
| |
| # Use the long type and make sure that qdmanage does not mess up the long type |
| cmd = 'QUERY --type=org.apache.qpid.dispatch.router.config.linkRoute' |
| out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) |
| |
| # Make sure there is a dir of in and out. |
| self.assertTrue('"dir": "in"' in out) |
| self.assertTrue('"dir": "out"' in out) |
| self.assertTrue('"connection": "broker"' in out) |
| |
| def test_bbb_qdstat_link_routes_routerB(self): |
| """ |
| Runs qdstat on router B to make sure that router B has two link routes, one 'in' and one 'out' |
| |
| """ |
| out = self.run_qdstat_linkRoute(self.routers[1].addresses[0]) |
| out_list = out.split() |
| self.assertEqual(out_list.count('in'), 2) |
| self.assertEqual(out_list.count('out'), 2) |
| |
| def test_ccc_qdstat_link_routes_routerC(self): |
| """ |
| Runs qdstat on router C to make sure that router C has two link routes, one 'in' and one 'out' |
| |
| """ |
| out = self.run_qdstat_linkRoute(self.routers[2].addresses[1]) |
| out_list = out.split() |
| |
| self.assertEqual(out_list.count('in'), 2) |
| self.assertEqual(out_list.count('out'), 2) |
| |
| def test_ddd_partial_link_route_match(self): |
| """ |
| The linkRoutePattern on Routers C and B is set to org.apache. |
| Creates a receiver listening on the address 'org.apache.dev' and a sender that sends to address 'org.apache.dev'. |
| Sends a message to org.apache.dev via router QDR.C and makes sure that the message was successfully |
| routed (using partial address matching) and received using pre-created links that were created as a |
| result of specifying addresses in the linkRoutePattern attribute('org.apache.'). |
| """ |
| hello_world_1 = "Hello World_1!" |
| |
| # Connects to listener #2 on QDR.C |
| addr = self.routers[2].addresses[1] |
| |
| blocking_connection = BlockingConnection(addr) |
| |
| # Receive on org.apache.dev |
| blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev") |
| |
| apply_options = AtMostOnce() |
| |
| # Sender to org.apache.dev |
| blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options) |
| msg = Message(body=hello_world_1) |
| # Send a message |
| blocking_sender.send(msg) |
| |
| received_message = blocking_receiver.receive() |
| |
| self.assertEqual(hello_world_1, received_message.body) |
| |
| # Connect to the router acting like the broker (QDR.A) and check the deliveriesIngress and deliveriesEgress |
| local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT) |
| self.assertEqual(u'QDR.A', local_node.query(type='org.apache.qpid.dispatch.router', |
| attribute_names=['id']).results[0][0]) |
| |
| self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', |
| name='M0org.apache.dev').deliveriesEgress) |
| self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', |
| name='M0org.apache.dev').deliveriesIngress) |
| |
| # There should be 4 links - |
| # 1. outbound receiver link on org.apache.dev |
| # 2. inbound sender link on blocking_sender |
| # 3. inbound link to the $management |
| # 4. outbound link to $management |
| # self.assertEqual(4, len() |
| self.assertEquals(4, len(local_node.query(type='org.apache.qpid.dispatch.router.link').results)) |
| |
| blocking_connection.close() |
| |
| def test_partial_link_route_match_1(self): |
| """ |
| This test is pretty much the same as the previous test (test_partial_link_route_match) but the connection is |
| made to router QDR.B instead of QDR.C and we expect to see the same behavior. |
| """ |
| hello_world_2 = "Hello World_2!" |
| addr = self.routers[1].addresses[0] |
| |
| blocking_connection = BlockingConnection(addr) |
| |
| # Receive on org.apache.dev |
| blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev") |
| |
| apply_options = AtMostOnce() |
| |
| # Sender to to org.apache.dev |
| blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options) |
| msg = Message(body=hello_world_2) |
| # Send a message |
| blocking_sender.send(msg) |
| |
| received_message = blocking_receiver.receive() |
| |
| self.assertEqual(hello_world_2, received_message.body) |
| |
| local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT) |
| |
| # Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms |
| # that the message was link routed |
| self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', |
| name='M0org.apache.dev').deliveriesEgress) |
| |
| self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', |
| name='M0org.apache.dev').deliveriesIngress) |
| |
| blocking_connection.close() |
| |
| def test_full_link_route_match(self): |
| """ |
| The linkRoutePattern on Routers C and B is set to org.apache. |
| Creates a receiver listening on the address 'org.apache' and a sender that sends to address 'org.apache'. |
| Sends a message to org.apache via router QDR.C and makes sure that the message was successfully |
| routed (using full address matching) and received using pre-created links that were created as a |
| result of specifying addresses in the linkRoutePattern attribute('org.apache.'). |
| """ |
| hello_world_3 = "Hello World_3!" |
| # Connects to listener #2 on QDR.C |
| addr = self.routers[2].addresses[1] |
| |
| blocking_connection = BlockingConnection(addr) |
| |
| # Receive on org.apache |
| blocking_receiver = blocking_connection.create_receiver(address="org.apache") |
| |
| apply_options = AtMostOnce() |
| |
| # Sender to to org.apache |
| blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options) |
| msg = Message(body=hello_world_3) |
| # Send a message |
| blocking_sender.send(msg) |
| |
| received_message = blocking_receiver.receive() |
| |
| self.assertEqual(hello_world_3, received_message.body) |
| |
| local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT) |
| |
| # Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms |
| # that the message was link routed |
| self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', |
| name='M0org.apache').deliveriesEgress) |
| |
| self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', |
| name='M0org.apache').deliveriesIngress) |
| |
| blocking_connection.close() |
| |
| def test_full_link_route_match_1(self): |
| """ |
| This test is pretty much the same as the previous test (test_full_link_route_match) but the connection is |
| made to router QDR.B instead of QDR.C and we expect the message to be link routed successfully. |
| """ |
| hello_world_4 = "Hello World_4!" |
| addr = self.routers[2].addresses[0] |
| |
| blocking_connection = BlockingConnection(addr) |
| |
| # Receive on org.apache |
| blocking_receiver = blocking_connection.create_receiver(address="org.apache") |
| |
| apply_options = AtMostOnce() |
| |
| # Sender to to org.apache |
| blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options) |
| |
| msg = Message(body=hello_world_4) |
| # Send a message |
| blocking_sender.send(msg) |
| |
| received_message = blocking_receiver.receive() |
| |
| self.assertEqual(hello_world_4, received_message.body) |
| |
| local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT) |
| |
| # Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms |
| # that the message was link routed |
| self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', |
| name='M0org.apache').deliveriesEgress) |
| |
| self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', |
| name='M0org.apache').deliveriesIngress) |
| |
| blocking_connection.close() |
| |
| def test_zzz_qdmanage_delete_link_route(self): |
| """ |
| We are deleting the link route using qdmanage short name. This should be the last test to run |
| """ |
| |
| # First delete linkRoutes on QDR.B |
| local_node = Node.connect(self.routers[1].addresses[0], timeout=TIMEOUT) |
| result_list = local_node.query(type='org.apache.qpid.dispatch.router.config.linkRoute').results |
| |
| identity_1 = result_list[0][1] |
| identity_2 = result_list[1][1] |
| identity_3 = result_list[2][1] |
| identity_4 = result_list[3][1] |
| |
| cmd = 'DELETE --type=linkRoute --identity=' + identity_1 |
| self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) |
| |
| cmd = 'DELETE --type=linkRoute --identity=' + identity_2 |
| self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) |
| |
| cmd = 'DELETE --type=linkRoute --identity=' + identity_3 |
| self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) |
| |
| cmd = 'DELETE --type=linkRoute --identity=' + identity_4 |
| self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) |
| |
| cmd = 'QUERY --type=linkRoute' |
| out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) |
| self.assertEquals(out.rstrip(), '[]') |
| |
| # linkRoutes now gone on QDR.B but remember that it still exist on QDR.C |
| # We will now try to create a receiver on address org.apache.dev on QDR.C. |
| # Since the linkRoute on QDR.B is gone, QDR.C |
| # will not allow a receiver to be created since there is no route to destination. |
| |
| # Connects to listener #2 on QDR.C |
| addr = self.routers[2].addresses[1] |
| |
| # Now delete linkRoutes on QDR.C to eradicate linkRoutes completely |
| local_node = Node.connect(addr, timeout=TIMEOUT) |
| result_list = local_node.query(type='org.apache.qpid.dispatch.router.config.linkRoute').results |
| |
| identity_1 = result_list[0][1] |
| identity_2 = result_list[1][1] |
| identity_3 = result_list[2][1] |
| identity_4 = result_list[3][1] |
| |
| cmd = 'DELETE --type=linkRoute --identity=' + identity_1 |
| self.run_qdmanage(cmd=cmd, address=addr) |
| |
| cmd = 'DELETE --type=linkRoute --identity=' + identity_2 |
| self.run_qdmanage(cmd=cmd, address=addr) |
| |
| cmd = 'DELETE --type=linkRoute --identity=' + identity_3 |
| self.run_qdmanage(cmd=cmd, address=addr) |
| |
| cmd = 'DELETE --type=linkRoute --identity=' + identity_4 |
| self.run_qdmanage(cmd=cmd, address=addr) |
| |
| cmd = 'QUERY --type=linkRoute' |
| out = self.run_qdmanage(cmd=cmd, address=addr) |
| self.assertEquals(out.rstrip(), '[]') |
| |
| blocking_connection = BlockingConnection(addr, timeout=3) |
| |
| # Receive on org.apache.dev (this address used to be linkRouted but not anymore since we deleted linkRoutes |
| # on both QDR.C and QDR.B) |
| blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev") |
| |
| apply_options = AtMostOnce() |
| hello_world_1 = "Hello World_1!" |
| # Sender to org.apache.dev |
| blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options) |
| msg = Message(body=hello_world_1) |
| |
| # Send a message |
| blocking_sender.send(msg) |
| received_message = blocking_receiver.receive(timeout=5) |
| self.assertEqual(hello_world_1, received_message.body) |
| |
| def test_yyy_delivery_tag(self): |
| """ |
| Tests that the router carries over the delivery tag on a link routed delivery |
| """ |
| listening_address = self.routers[1].addresses[1] |
| sender_address = self.routers[2].addresses[1] |
| qdstat_address = self.routers[2].addresses[1] |
| test = DeliveryTagsTest(sender_address, listening_address, qdstat_address) |
| test.run() |
| self.assertTrue(test.message_received) |
| self.assertTrue(test.delivery_tag_verified) |
| |
| def test_close_with_unsettled(self): |
| test = CloseWithUnsettledTest(self.routers[1].addresses[0], self.routers[1].addresses[1]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| class DeliveryTagsTest(MessagingHandler): |
| def __init__(self, sender_address, listening_address, qdstat_address): |
| super(DeliveryTagsTest, self).__init__() |
| self.sender_address = sender_address |
| self.listening_address = listening_address |
| self.sender = None |
| self.message_received = False |
| self.receiver_connection = None |
| self.sender_connection = None |
| self.qdstat_address = qdstat_address |
| self.id = '1235' |
| self.times = 1 |
| self.delivery_tag_verified = False |
| # The delivery tag we are going to send in the transfer frame |
| # We will later make sure that the same delivery tag shows up on the receiving end in the link routed case. |
| self.delivery_tag = '92319' |
| |
| def on_start(self, event): |
| self.receiver_connection = event.container.connect(self.listening_address) |
| |
| def on_connection_remote_open(self, event): |
| if event.connection == self.receiver_connection: |
| continue_loop = True |
| # Dont open the sender connection unless we can make sure that there is a remote receiver ready to |
| # accept the message. |
| # If there is no remote receiver, the router will throw a 'No route to destination' error when |
| # creating sender connection. |
| # The following loops introduces a wait before creating the sender connection. It gives time to the |
| # router so that the address Dpulp.task can show up on the remoteCount |
| i = 0 |
| while continue_loop: |
| if i > 100: # If we have run the read command for more than hundred times and we still do not have |
| # the remoteCount set to 1, there is a problem, just exit out of the function instead |
| # of looping to infinity. |
| self.receiver_connection.close() |
| return |
| local_node = Node.connect(self.qdstat_address, timeout=TIMEOUT) |
| out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount |
| if out == 1: |
| continue_loop = False |
| i+=1 |
| |
| self.sender_connection = event.container.connect(self.sender_address) |
| self.sender = event.container.create_sender(self.sender_connection, "pulp.task", options=AtMostOnce()) |
| |
| def on_sendable(self, event): |
| if self.times == 1: |
| msg = Message(body="Hello World") |
| self.sender.send(msg, tag=self.delivery_tag) |
| self.sender_connection.close() |
| self.times +=1 |
| |
| def on_message(self, event): |
| if "Hello World" == event.message.body: |
| self.message_received = True |
| |
| # If the tag on the delivery is the same as the tag we sent with the initial transfer, it means |
| # that the router has propagated the delivery tag successfully because of link routing. |
| if self.delivery_tag == event.delivery.tag: |
| self.delivery_tag_verified = True |
| self.receiver_connection.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class Timeout(object): |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def on_timer_task(self, event): |
| self.parent.timeout() |
| |
| |
| class CloseWithUnsettledTest(MessagingHandler): |
| ## |
| ## This test sends a message across an attach-routed link. While the message |
| ## is unsettled, the client link is closed. The test is ensuring that the |
| ## router does not crash during the closing of the links. |
| ## |
| def __init__(self, normal_addr, route_addr): |
| super(CloseWithUnsettledTest, self).__init__(prefetch=0, auto_accept=False) |
| self.normal_addr = normal_addr |
| self.route_addr = route_addr |
| self.dest = "pulp.task.CWUtest" |
| self.error = None |
| |
| def timeout(self): |
| self.error = "Timeout Expired - Check for cores" |
| self.conn_normal.close() |
| self.conn_route.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(5, Timeout(self)) |
| self.conn_route = event.container.connect(self.route_addr) |
| |
| def on_connection_opened(self, event): |
| if event.connection == self.conn_route: |
| self.conn_normal = event.container.connect(self.normal_addr) |
| elif event.connection == self.conn_normal: |
| self.sender = event.container.create_sender(self.conn_normal, self.dest) |
| |
| def on_connection_closed(self, event): |
| self.conn_route.close() |
| self.timer.cancel() |
| |
| def on_link_opened(self, event): |
| if event.receiver: |
| self.receiver = event.receiver |
| self.receiver.flow(1) |
| |
| def on_sendable(self, event): |
| msg = Message(body="CloseWithUnsettled") |
| event.sender.send(msg) |
| |
| def on_message(self, event): |
| self.conn_normal.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |