blob: c05954aeac0e17e52c50df1c74ec4bd0dbabf37e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from proton import Message, Timeout
from system_test import TestCase, Qdrouterd, main_module, Process, unittest
from proton.handlers import MessagingHandler
from proton.reactor import Container
from qpid_dispatch_internal.compat import UNICODE
#------------------------------------------------
# Helper classes for all tests.
#------------------------------------------------
class Timeout(object):
"""
Named timeout object can handle multiple simultaneous
timers, by telling the parent which one fired.
"""
def __init__ ( self, parent, name ):
self.parent = parent
self.name = name
def on_timer_task ( self, event ):
self.parent.timeout ( self.name )
class ManagementMessageHelper ( object ):
"""
Format management messages.
"""
def __init__ ( self, reply_addr ):
self.reply_addr = reply_addr
def make_router_link_query ( self ) :
props = { 'count': '100',
'operation': 'QUERY',
'entityType': 'org.apache.qpid.dispatch.router.link',
'name': 'self',
'type': 'org.amqp.management'
}
attrs = []
attrs.append ( UNICODE('linkType') )
attrs.append ( UNICODE('linkDir') )
attrs.append ( UNICODE('deliveryCount') )
attrs.append ( UNICODE('priority') )
msg_body = { }
msg_body [ 'attributeNames' ] = attrs
return Message ( body=msg_body, properties=props, reply_to=self.reply_addr )
#================================================================
# Setup
#================================================================
class PriorityTests ( TestCase ):
@classmethod
def setUpClass(cls):
super(PriorityTests, cls).setUpClass()
def router(name, more_config):
config = [ ('router', {'mode': 'interior', 'id': name, 'workerThreads': 4}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'})
] \
+ more_config
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
# The sender will send all its messages with magic_message_priority.
# The first router will set target addr priority to magic_address_priority.
# It is important *not* to choose 4 for either of these priorities,
# since that is the default message priority.
cls.magic_message_priority = 3
cls.magic_address_priority = 7
link_cap = 100
A_client_port = cls.tester.get_port()
B_client_port = cls.tester.get_port()
C_client_port = cls.tester.get_port()
A_inter_router_port = cls.tester.get_port()
B_inter_router_port = cls.tester.get_port()
C_inter_router_port = cls.tester.get_port()
A_config = [
( 'listener',
{ 'port' : A_client_port,
'role' : 'normal',
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
}
),
( 'listener',
{ 'role' : 'inter-router',
'port' : A_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
}
),
( 'address',
{ 'prefix' : 'speedy',
'priority' : cls.magic_address_priority,
'distribution' : 'closest'
}
),
]
cls.B_config = [
( 'listener',
{ 'port' : B_client_port,
'role' : 'normal',
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
}
),
( 'listener',
{ 'role' : 'inter-router',
'port' : B_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
}
),
( 'connector',
{ 'name' : 'BA_connector',
'role' : 'inter-router',
'port' : A_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
}
)
]
C_config = [
( 'listener',
{ 'port' : C_client_port,
'role' : 'normal',
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
}
),
( 'listener',
{ 'role' : 'inter-router',
'port' : C_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
}
),
( 'connector',
{ 'name' : 'CB_connector',
'role' : 'inter-router',
'port' : B_inter_router_port,
'linkCapacity' : link_cap,
'stripAnnotations' : 'no'
}
)
]
router ( 'A', A_config )
router ( 'B', cls.B_config )
router ( 'C', C_config )
router_A = cls.routers[0]
router_B = cls.routers[1]
router_C = cls.routers[2]
router_A.wait_router_connected('B')
router_A.wait_router_connected('C')
cls.client_addrs = ( router_A.addresses[0],
router_B.addresses[0],
router_C.addresses[0]
)
def test_priority ( self ):
name = 'test_01'
test = Priority ( self,
name,
self.client_addrs,
"speedy/01",
self.magic_message_priority,
self.magic_address_priority
)
test.run()
self.assertEqual ( None, test.error )
#================================================================
# Tests
#================================================================
class Priority ( MessagingHandler ):
# In this test we will have a linear network of 3 routers.
# The sender attaches at A, and the receiver at C.
#
# receiver <--- C <--- B <--- A <--- sender
#
# Priority -- whether message or address -- only operates
# on inter-router links. The links from A to B will show
# address-priority overriding message-priority. When a
# router does not set any message priority, then messages
# are routed acording to their intrinsic priority which
# was assigned by the sender. This will be shown by the
# connection from router B to C.
#
# The address that the clients use has a prefix of 'speedy'.
# Router A will assign a priority of magic_addr_priority to all
# 'speedy' addresses.
# No other routers will assign any address priorities.
#
# The sending client will assign a priority of magic_msg_priority
# to all the messages it sends.
#
# So what should happen is:
#
# 1. at router A, all the 'speedy' messages go out with
# magic_addr_priority, because addr priority takes precedence.
#
# 2. at router B, they all go out with magic_msg_priority,
# because that router has not assigned any addr priority,
# so the intrinsic message priorities are used.
#
# 3. Nothing special happens at router C, because it is sending
# the messages out over a connection to an endpoint, which
# is not an inter-router connection.
#
# In this test we will send a known number of messages and
# then send management queries to A and B to learn at what
# priorities the messages actually travelled.
def __init__ ( self, parent, test_name, client_addrs, destination, magic_msg_priority, magic_addr_priority ):
super(Priority, self).__init__(prefetch=10)
self.parent = parent
self.client_addrs = client_addrs
self.dest = destination
self.magic_msg_priority = magic_msg_priority
self.magic_addr_priority = magic_addr_priority
self.error = None
self.sender = None
self.receiver = None
self.send_timer = None
self.n_messages = 100
self.n_sent = 0
self.send_conn = None
self.recv_conn = None
self.n_received = 0
self.reactor = None
self.timer_count = 0
self.sent_queries = False
self.finishing = False
self.goals = 0
self.n_goals = 2
self.connections = list()
self.A_addr = self.client_addrs[0]
self.B_addr = self.client_addrs[1]
self.C_addr = self.client_addrs[2]
self.routers = {
'A' : dict(),
'B' : dict()
}
# Shut down everything and exit.
def bail ( self, text ):
self.send_timer.cancel ( )
self.finishing = True
self.error = text
for conn in self.connections :
conn.close()
def make_connection ( self, event, addr ) :
cnx = event.container.connect ( addr )
self.connections.append ( cnx )
return cnx
def on_start ( self, event ):
self.reactor = event.reactor
self.send_conn = self.make_connection ( event, self.A_addr )
self.recv_conn = self.make_connection ( event, self.C_addr )
self.sender = event.container.create_sender ( self.send_conn, self.dest )
self.receiver = event.container.create_receiver ( self.recv_conn, self.dest )
self.receiver.flow ( 100 )
self.routers['A'] ['mgmt_conn'] = self.make_connection ( event, self.A_addr )
self.routers['A'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['A'] ['mgmt_conn'], dynamic=True )
self.routers['A'] ['mgmt_sender'] = event.container.create_sender ( self.routers['A'] ['mgmt_conn'], "$management" )
self.routers['B'] ['mgmt_conn'] = self.make_connection ( event, self.B_addr )
self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['B'] ['mgmt_conn'], dynamic=True )
self.routers['B'] ['mgmt_sender'] = event.container.create_sender ( self.routers['B'] ['mgmt_conn'], "$management" )
self.send_timer = event.reactor.schedule ( 2, Timeout(self, "send") )
def timeout ( self, name ):
if name == 'send':
self.send ( )
if not self.sent_queries :
self.test_timer = self.reactor.schedule ( 1, Timeout(self, "send") )
def on_link_opened ( self, event ) :
# A mgmt link has opened. Create its management helper.
# ( Now we know the address that the management helper should use as
# the "reply-to" in its management message. )
if event.receiver == self.routers['A'] ['mgmt_receiver'] :
event.receiver.flow ( 1000 )
self.routers['A'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
elif event.receiver == self.routers['B'] ['mgmt_receiver'] :
event.receiver.flow ( 1000 )
self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
def send ( self ) :
if self.sender.credit <= 0:
self.receiver.flow ( 100 )
return
# First send the payload messages.
if self.n_sent < self.n_messages :
for i in range(50) :
msg = Message ( body=self.n_sent )
msg.priority = 3
self.sender.send ( msg )
self.n_sent += 1
# Then send the management queries.
# But only send them once.
elif not self.sent_queries :
# Query router A.
mgmt_helper = self.routers['A'] ['mgmt_helper']
mgmt_sender = self.routers['A'] ['mgmt_sender']
msg = mgmt_helper.make_router_link_query ( )
mgmt_sender.send ( msg )
# Query router B.
mgmt_helper = self.routers['B'] ['mgmt_helper']
mgmt_sender = self.routers['B'] ['mgmt_sender']
msg = mgmt_helper.make_router_link_query ( )
mgmt_sender.send ( msg )
self.sent_queries = True
# This test has two goals: get the response from router A
# and from router B. As they come in, we check them. If
# the response is unsatisfactory we bail out
def goal_satisfied ( self ) :
self.goals += 1
if self.goals >= self.n_goals :
self.bail ( None )
def on_message ( self, event ) :
# Don't take any more messages if 'bail' has been called.
if self.finishing :
return
msg = event.message
if event.receiver == self.routers['A'] ['mgmt_receiver'] :
# Router A has only one set of outgoing links, and it
# has set a priority for our target address. We should
# see all the messages we sent go out with that priority.
magic = self.magic_addr_priority
if 'results' in msg.body :
results = msg.body['results']
# I do not want to trust the possibility that the
# results will be returned to me in priority-order.
# Instead, I explicitly asked for the link priority
# in the management query that was sent. Now I will
# loop through all the results, and look for the one
# with the desired priority.
for i in range(len(results)) :
result = results[i]
role = result[0]
dir = result[1]
message_count = result[2]
priority = result[3]
if role == "inter-router" and dir == "out" and priority == magic :
if message_count >= self.n_messages :
self.goal_satisfied ( )
return
else :
self.bail ( "Router A priority %d had %d messages instead of %d." %
(magic, message_count, self.n_messages) )
return
elif event.receiver == self.routers['B'] ['mgmt_receiver'] :
# Router B has two sets of outgoing links, and it has not
# set a priority for the target address. We should see all
# of our messages going out over the message-intrinsic
# priority that the sending client used -- one one of those
# two sets of outgoing links.
magic = self.magic_msg_priority
if 'results' in msg.body :
message_counts = list()
results = msg.body['results']
for i in range(len(results)) :
result = results[i]
role = result[0]
dir = result[1]
message_count = result[2]
priority = result[3]
if role == "inter-router" and dir == "out" :
if priority == magic :
message_counts.append ( message_count )
if self.n_messages in message_counts :
self.goal_satisfied ( )
else :
self.bail ( "No outgoing link on router B had %d messages at priority 3" % self.n_messages )
else :
# This is a payload message -- not management. Just count it.
self.n_received += 1
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())