blob: e50fc844dcf8d2ae6138de2c1176dfc6506dfc00 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from proton import Message, Timeout
from system_test import TestCase, Qdrouterd, main_module, unittest
from proton.handlers import MessagingHandler
from proton.reactor import Container
from qpid_dispatch_internal.compat import UNICODE
# ------------------------------------------------
# Helper classes for all tests.
# ------------------------------------------------
class Timeout(object):
Named timeout object can handle multiple simultaneous
timers, by telling the parent which one fired.
def __init__(self, parent, name):
self.parent = parent = name
def on_timer_task(self, event):
class ManagementMessageHelper (object):
Format management messages.
def __init__(self, reply_addr):
self.reply_addr = reply_addr
def make_router_link_query(self) :
props = {'count': '100',
'operation': 'QUERY',
'entityType': '',
'name': 'self',
'type': ''
attrs = []
msg_body = {}
msg_body['attributeNames'] = attrs
return Message(body=msg_body, properties=props, reply_to=self.reply_addr)
# ================================================================
# Setup
# ================================================================
class PriorityTests (TestCase):
def setUpClass(cls):
super(PriorityTests, cls).setUpClass()
def router(name, more_config):
config = [('router', {'mode': 'interior', 'id': name, 'workerThreads': 4}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'})
] \
+ more_config
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
# The sender will send all its messages with magic_message_priority.
# The first router will set target addr priority to magic_address_priority.
# It is important *not* to choose 4 for either of these priorities,
# since that is the default message priority.
cls.magic_message_priority = 3
cls.magic_address_priority = 7
link_cap = 100
A_client_port = cls.tester.get_port()
B_client_port = cls.tester.get_port()
C_client_port = cls.tester.get_port()
A_inter_router_port = cls.tester.get_port()
B_inter_router_port = cls.tester.get_port()
C_inter_router_port = cls.tester.get_port()
A_config = [
{'port' : A_client_port,
'role' : 'normal',
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
{'role' : 'inter-router',
'port' : A_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
{'prefix' : 'speedy',
'priority' : cls.magic_address_priority,
'distribution' : 'closest'
cls.B_config = [
{'port' : B_client_port,
'role' : 'normal',
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
{'role' : 'inter-router',
'port' : B_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
{'name' : 'BA_connector',
'role' : 'inter-router',
'port' : A_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
C_config = [
{'port' : C_client_port,
'role' : 'normal',
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
{'role' : 'inter-router',
'port' : C_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
{'name' : 'CB_connector',
'role' : 'inter-router',
'port' : B_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
router('A', A_config)
router('B', cls.B_config)
router('C', C_config)
router_A = cls.routers[0]
router_B = cls.routers[1]
router_C = cls.routers[2]
cls.client_addrs = (router_A.addresses[0],
def test_priority(self):
name = 'test_01'
test = Priority(self,
# ================================================================
# Tests
# ================================================================
class Priority (MessagingHandler):
# In this test we will have a linear network of 3 routers.
# The sender attaches at A, and the receiver at C.
# receiver <--- C <--- B <--- A <--- sender
# Priority -- whether message or address -- only operates
# on inter-router links. The links from A to B will show
# address-priority overriding message-priority. When a
# router does not set any message priority, then messages
# are routed acording to their intrinsic priority which
# was assigned by the sender. This will be shown by the
# connection from router B to C.
# The address that the clients use has a prefix of 'speedy'.
# Router A will assign a priority of magic_addr_priority to all
# 'speedy' addresses.
# No other routers will assign any address priorities.
# The sending client will assign a priority of magic_msg_priority
# to all the messages it sends.
# So what should happen is:
# 1. at router A, all the 'speedy' messages go out with
# magic_addr_priority, because addr priority takes precedence.
# 2. at router B, they all go out with magic_msg_priority,
# because that router has not assigned any addr priority,
# so the intrinsic message priorities are used.
# 3. Nothing special happens at router C, because it is sending
# the messages out over a connection to an endpoint, which
# is not an inter-router connection.
# In this test we will send a known number of messages and
# then send management queries to A and B to learn at what
# priorities the messages actually travelled.
def __init__(self, parent, test_name, client_addrs, destination, magic_msg_priority, magic_addr_priority):
super(Priority, self).__init__(prefetch=10)
self.parent = parent
self.client_addrs = client_addrs
self.dest = destination
self.magic_msg_priority = magic_msg_priority
self.magic_addr_priority = magic_addr_priority
self.error = None
self.sender = None
self.receiver = None
self.send_timer = None
self.n_messages = 100
self.n_sent = 0
self.send_conn = None
self.recv_conn = None
self.n_received = 0
self.reactor = None
self.timer_count = 0
self.sent_queries = False
self.finishing = False
self.goals = 0
self.n_goals = 2
self.connections = list()
self.A_addr = self.client_addrs[0]
self.B_addr = self.client_addrs[1]
self.C_addr = self.client_addrs[2]
self.routers = {
'A' : dict(),
'B' : dict()
# Shut down everything and exit.
def bail(self, text):
self.finishing = True
self.error = text
for conn in self.connections :
def make_connection(self, event, addr) :
cnx = event.container.connect(addr)
return cnx
def on_start(self, event):
self.reactor = event.reactor
self.send_conn = self.make_connection(event, self.A_addr)
self.recv_conn = self.make_connection(event, self.C_addr)
self.sender = event.container.create_sender(self.send_conn, self.dest)
self.receiver = event.container.create_receiver(self.recv_conn, self.dest)
self.routers['A']['mgmt_conn'] = self.make_connection(event, self.A_addr)
self.routers['A']['mgmt_receiver'] = event.container.create_receiver(self.routers['A']['mgmt_conn'], dynamic=True)
self.routers['A']['mgmt_sender'] = event.container.create_sender(self.routers['A']['mgmt_conn'], "$management")
self.routers['B']['mgmt_conn'] = self.make_connection(event, self.B_addr)
self.routers['B']['mgmt_receiver'] = event.container.create_receiver(self.routers['B']['mgmt_conn'], dynamic=True)
self.routers['B']['mgmt_sender'] = event.container.create_sender(self.routers['B']['mgmt_conn'], "$management")
self.send_timer = event.reactor.schedule(2, Timeout(self, "send"))
def timeout(self, name):
if name == 'send':
if not self.sent_queries :
self.test_timer = self.reactor.schedule(1, Timeout(self, "send"))
def on_link_opened(self, event) :
# A mgmt link has opened. Create its management helper.
# ( Now we know the address that the management helper should use as
# the "reply-to" in its management message. )
if event.receiver == self.routers['A']['mgmt_receiver'] :
self.routers['A']['mgmt_helper'] = ManagementMessageHelper(event.receiver.remote_source.address)
elif event.receiver == self.routers['B']['mgmt_receiver'] :
self.routers['B']['mgmt_helper'] = ManagementMessageHelper(event.receiver.remote_source.address)
def send(self) :
if <= 0:
# First send the payload messages.
if self.n_sent < self.n_messages :
for i in range(50) :
msg = Message(body=self.n_sent)
msg.priority = 3
self.n_sent += 1
# Then send the management queries.
# But only send them once.
elif not self.sent_queries :
# Query router A.
mgmt_helper = self.routers['A']['mgmt_helper']
mgmt_sender = self.routers['A']['mgmt_sender']
msg = mgmt_helper.make_router_link_query()
# Query router B.
mgmt_helper = self.routers['B']['mgmt_helper']
mgmt_sender = self.routers['B']['mgmt_sender']
msg = mgmt_helper.make_router_link_query()
self.sent_queries = True
# This test has two goals: get the response from router A
# and from router B. As they come in, we check them. If
# the response is unsatisfactory we bail out
def goal_satisfied(self) :
self.goals += 1
if self.goals >= self.n_goals :
def on_message(self, event) :
# Don't take any more messages if 'bail' has been called.
if self.finishing :
msg = event.message
if event.receiver == self.routers['A']['mgmt_receiver'] :
# Router A has only one set of outgoing links, and it
# has set a priority for our target address. We should
# see all the messages we sent go out with that priority.
magic = self.magic_addr_priority
if 'results' in msg.body :
results = msg.body['results']
# I do not want to trust the possibility that the
# results will be returned to me in priority-order.
# Instead, I explicitly asked for the link priority
# in the management query that was sent. Now I will
# loop through all the results, and look for the one
# with the desired priority.
for i in range(len(results)) :
result = results[i]
role = result[0]
dir = result[1]
message_count = result[2]
priority = result[3]
if role == "inter-router" and dir == "out" and priority == magic :
if message_count >= self.n_messages :
else :
self.bail("Router A priority %d had %d messages instead of %d." %
(magic, message_count, self.n_messages))
elif event.receiver == self.routers['B']['mgmt_receiver'] :
# Router B has two sets of outgoing links, and it has not
# set a priority for the target address. We should see all
# of our messages going out over the message-intrinsic
# priority that the sending client used -- one one of those
# two sets of outgoing links.
magic = self.magic_msg_priority
if 'results' in msg.body :
message_counts = list()
results = msg.body['results']
for i in range(len(results)) :
result = results[i]
role = result[0]
dir = result[1]
message_count = result[2]
priority = result[3]
if role == "inter-router" and dir == "out" :
if priority == magic :
if self.n_messages in message_counts :
else :
self.bail("No outgoing link on router B had %d messages at priority 3" % self.n_messages)
else :
# This is a payload message -- not management. Just count it.
self.n_received += 1
def run(self):
if __name__ == '__main__':