blob: 6f3ca61648dd89c3a8fc96ce4552c2f942c958b3 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest
from time import sleep
from subprocess import PIPE
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
from proton import Message
from proton.reactor import AtMostOnce
from proton.utils import BlockingConnection, LinkDetached
from qpid_dispatch.management.client import Node
class LinkRoutePatternTest(TestCase):
"""
Tests the linkRoutePattern property of the dispatch router.
Sets up 3 routers (one of which is acting as a broker(QDR.A)). 2 routers have linkRoutePattern set to 'org.apache.'
(please see configs in the setUpClass method to get a sense of how the routers and their connections are configured)
The tests in this class send and receive messages across this network of routers to link routable addresses.
Uses the Python Blocking API to send/receive messages. The blocking api plays neatly into the synchronous nature
of system tests.
QDR.A acting broker
+---------+ +---------+ +---------+ +-----------------+
| | <------ | | <----- | |<----| blocking_sender |
| QDR.A | | QDR.B | | QDR.C | +-----------------+
| | ------> | | ------> | | +-------------------+
+---------+ +---------+ +---------+---->| blocking_receiver |
+-------------------+
"""
@classmethod
def get_router(cls, index):
return cls.routers[index]
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(LinkRoutePatternTest, cls).setUpClass()
def router(name, connection):
config = [
('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.%s'%name}),
('router', {'mode': 'interior', 'routerId': 'QDR.%s'%name}),
('fixedAddress', {'prefix': '/closest/', 'fanout': 'single', 'bias': 'closest'}),
('fixedAddress', {'prefix': '/spread/', 'fanout': 'single', 'bias': 'spread'}),
('fixedAddress', {'prefix': '/multicast/', 'fanout': 'multiple'}),
('fixedAddress', {'prefix': '/', 'fanout': 'multiple'}),
] + connection
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
cls.routers = []
a_listener_port = cls.tester.get_port()
b_listener_port = cls.tester.get_port()
c_listener_port = cls.tester.get_port()
router('A',
[
('listener', {'role': 'normal', 'addr': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
])
router('B',
[
('listener', {'role': 'normal', 'addr': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
# This is an on-demand connection made from QDR.B's ephemeral port to a_listener_port
('connector', {'name': 'broker', 'role': 'on-demand', 'addr': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
# Only inter router communication must happen on 'inter-router' connectors. This connector makes
# a connection from the router B's ephemeral port to c_listener_port
('connector', {'role': 'inter-router', 'addr': '0.0.0.0', 'port': c_listener_port}),
('linkRoutePattern', {'prefix': 'org.apache', 'connector': 'broker'})
]
)
router('C',
[
('listener', {'addr': '0.0.0.0', 'role': 'inter-router', 'port': c_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
# The client will exclusively use the following listener to connect to QDR.C
('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}),
# Note here that the linkRoutePattern is set to org.apache. which makes it backward compatible.
# The dot(.) at the end is ignored by the address hashing scheme.
('linkRoutePattern', {'prefix': 'org.apache.'})
]
)
# Wait for the routers to locate each other
cls.routers[1].wait_router_connected('QDR.C')
cls.routers[2].wait_router_connected('QDR.B')
# This is not a classic router network in the sense that one router is acting as a broker. We allow a little
# bit more time for the routers to stabilize.
sleep(2)
def run_qdstat_linkRoute(self, address):
p = self.popen(
['qdstat', '--bus', str(address), '--timeout', str(TIMEOUT) ] + ['--linkroute'],
name='qdstat-'+self.id(), stdout=PIPE, expect=None)
out = p.communicate()[0]
assert p.returncode == 0, "qdstat exit status %s, output:\n%s" % (p.returncode, out)
return out
def test_bbb_qdstat_link_routes_routerB(self):
"""
Runs qdstat on router B to make sure that router B has two link routes, one 'in' and one 'out'
"""
out = self.run_qdstat_linkRoute(self.routers[1].addresses[0])
out_list = out.split()
self.assertEqual(out_list.count('in'), 1)
self.assertEqual(out_list.count('out'), 1)
def test_ccc_qdstat_link_routes_routerC(self):
"""
Runs qdstat on router C to make sure that router C has two link routes, one 'in' and one 'out'
"""
out = self.run_qdstat_linkRoute(self.routers[2].addresses[1])
out_list = out.split()
self.assertEqual(out_list.count('in'), 1)
self.assertEqual(out_list.count('out'), 1)
def test_ddd_partial_link_route_match(self):
"""
The linkRoutePattern on Routers C and B is set to org.apache.
Creates a receiver listening on the address 'org.apache.dev' and a sender that sends to address 'org.apache.dev'.
Sends a message to org.apache.dev via router QDR.C and makes sure that the message was successfully
routed (using partial address matching) and received using pre-created links that were created as a
result of specifying addresses in the linkRoutePattern attribute('org.apache.').
"""
hello_world_1 = "Hello World_1!"
# Connects to listener #2 on QDR.C
addr = self.routers[2].addresses[1]
blocking_connection = BlockingConnection(addr)
# Receive on org.apache.dev
blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev")
apply_options = AtMostOnce()
# Sender to to org.apache.dev
blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options)
msg = Message(body=hello_world_1)
# Send a message
blocking_sender.send(msg)
received_message = blocking_receiver.receive()
self.assertEqual(hello_world_1, received_message.body)
# Connect to the router acting like the broker (QDR.A) and check the deliveriesIngress and deliveriesEgress
local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
self.assertEqual(u'QDR.A', local_node.query(type='org.apache.qpid.dispatch.router',
attribute_names=['routerId']).results[0][0])
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='M0org.apache.dev').deliveriesEgress,
"deliveriesEgress is wrong")
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='M0org.apache.dev').deliveriesIngress,
"deliveriesIngress is wrong")
# There should be 4 links -
# 1. outbound receiver link on org.apache.dev
# 2. inbound sender link on blocking_sender
# 3. inbound link to the $management
# 4. outbound link to $management
# self.assertEqual(4, len()
self.assertEquals(4, len(local_node.query(type='org.apache.qpid.dispatch.router.link').results))
#blocking_receiver.close()
blocking_connection.close()
def test_partial_link_route_match_1(self):
"""
This test is pretty much the same as the previous test (test_partial_link_route_match) but the connection is
made to router QDR.B instead of QDR.C and we expect to see the same behavior.
"""
hello_world_2 = "Hello World_2!"
addr = self.routers[1].addresses[0]
blocking_connection = BlockingConnection(addr)
# Receive on org.apache.dev
blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev")
apply_options = AtMostOnce()
# Sender to to org.apache.dev
blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options)
msg = Message(body=hello_world_2)
# Send a message
blocking_sender.send(msg)
received_message = blocking_receiver.receive()
self.assertEqual(hello_world_2, received_message.body)
local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
# Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms
# that the message was link routed
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='M0org.apache.dev').deliveriesEgress,
"deliveriesEgress is wrong")
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='M0org.apache.dev').deliveriesIngress,
"deliveriesIngress is wrong")
#blocking_receiver.close()
blocking_connection.close()
def test_full_link_route_match(self):
"""
The linkRoutePattern on Routers C and B is set to org.apache.
Creates a receiver listening on the address 'org.apache' and a sender that sends to address 'org.apache'.
Sends a message to org.apache via router QDR.C and makes sure that the message was successfully
routed (using full address matching) and received using pre-created links that were created as a
result of specifying addresses in the linkRoutePattern attribute('org.apache.').
"""
hello_world_3 = "Hello World_3!"
# Connects to listener #2 on QDR.C
addr = self.routers[2].addresses[1]
blocking_connection = BlockingConnection(addr)
# Receive on org.apache
blocking_receiver = blocking_connection.create_receiver(address="org.apache")
apply_options = AtMostOnce()
# Sender to to org.apache
blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options)
msg = Message(body=hello_world_3)
# Send a message
blocking_sender.send(msg)
received_message = blocking_receiver.receive()
self.assertEqual(hello_world_3, received_message.body)
local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
# Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms
# that the message was link routed
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='M0org.apache').deliveriesEgress,
"deliveriesEgress is wrong")
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='M0org.apache').deliveriesIngress,
"deliveriesIngress is wrong")
#blocking_receiver.close()
blocking_connection.close()
def test_full_link_route_match_1(self):
"""
This test is pretty much the same as the previous test (test_full_link_route_match) but the connection is
made to router QDR.B instead of QDR.C and we expect the message to be link routed successfully.
"""
hello_world_4 = "Hello World_4!"
addr = self.routers[2].addresses[0]
blocking_connection = BlockingConnection(addr)
# Receive on org.apache
blocking_receiver = blocking_connection.create_receiver(address="org.apache")
apply_options = AtMostOnce()
# Sender to to org.apache
blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options)
msg = Message(body=hello_world_4)
# Send a message
blocking_sender.send(msg)
received_message = blocking_receiver.receive()
self.assertEqual(hello_world_4, received_message.body)
local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
# Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms
# that the message was link routed
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='M0org.apache').deliveriesEgress,
"deliveriesEgress is wrong")
self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
name='M0org.apache').deliveriesIngress,
"deliveriesIngress is wrong")
#blocking_receiver.close()
blocking_connection.close()
if __name__ == '__main__':
unittest.main(main_module())