blob: a796667602454a06b62490943e3894b839a9709b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import unittest
from proton import Message, Timeout
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
from proton.handlers import MessagingHandler
from proton.reactor import Container
# ====================================================
# Helper classes for all tests.
# ====================================================
# Named timers allow test code to distinguish between several
# simultaneous timers, going off at different rates.
class Timeout (object):
def __init__(self, parent, name):
self.parent = parent
self.name = name
def on_timer_task(self, event):
self.parent.timeout(self.name)
# ================================================================
# Setup
# ================================================================
class TopologyAdditionTests (TestCase):
@classmethod
def setUpClass(cls):
super(TopologyAdditionTests, cls).setUpClass()
def router(name, more_config):
config = [('router', {'mode': 'interior', 'id': name}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'})
] \
+ more_config
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
client_ports = dict()
client_ports['A'] = cls.tester.get_port()
client_ports['B'] = cls.tester.get_port()
client_ports['C'] = cls.tester.get_port()
client_ports['D'] = cls.tester.get_port()
cls.inter_router_ports = dict()
cls.inter_router_ports['A'] = cls.tester.get_port()
cls.inter_router_ports['B'] = cls.tester.get_port()
initial_cost = 10
lower_cost = 8
higher_cost = 12
# Only routers A and B are set up initially by this class.
# Routers C and D are started by the test itself.
router_A_config = [
('listener',
{'port': client_ports['A'],
'role': 'normal',
'stripAnnotations': 'no'
}
),
('listener',
{'role': 'inter-router',
'port': cls.inter_router_ports['A']
}
)
]
router_B_config = [
('listener',
{'port': client_ports['B'],
'role': 'normal',
'stripAnnotations': 'no'
}
),
('listener',
{'role': 'inter-router',
'port': cls.inter_router_ports['B'],
'stripAnnotations': 'no'
}
),
('connector',
{'name': 'AB_connector',
'role': 'inter-router',
'port': cls.inter_router_ports['A'],
'cost': initial_cost,
'stripAnnotations': 'no'
}
)
]
router('A', router_A_config)
router('B', router_B_config)
router_A = cls.routers[0]
router_B = cls.routers[1]
router_A.wait_router_connected('B')
cls.A_addr = router_A.addresses[0]
cls.B_addr = router_B.addresses[0]
# The two connections that this router will make, AC and BC,
# will be lower cost than the direct AB route that the network
# already has.
cls.router_C_config = [
('listener',
{'port': client_ports['C'],
'role': 'normal',
'stripAnnotations': 'no'
}
),
('connector',
{'name': 'AC_connector',
'role': 'inter-router',
'port': cls.inter_router_ports['A'],
'cost': int(lower_cost / 2),
'stripAnnotations': 'no',
'linkCapacity' : 1000
}
),
('connector',
{'name': 'BC_connector',
'role': 'inter-router',
'port': cls.inter_router_ports['B'],
'cost': int(lower_cost / 2),
'stripAnnotations': 'no',
'linkCapacity' : 1000
}
)
]
# The two connections that this router will make, AD and BD,
# will be higher cost than the other paths the networks already has
# available to get from A to B.
cls.router_D_config = [
('listener',
{'port': client_ports['D'],
'role': 'normal',
'stripAnnotations': 'no'
}
),
('connector',
{'name': 'AD_connector',
'role': 'inter-router',
'port': cls.inter_router_ports['A'],
'cost': int(higher_cost / 2),
'stripAnnotations': 'no',
'linkCapacity' : 1000
}
),
('connector',
{'name': 'BD_connector',
'role': 'inter-router',
'port': cls.inter_router_ports['B'],
'cost': int(higher_cost / 2),
'stripAnnotations': 'no',
'linkCapacity' : 1000
}
)
]
# This method allows test code to add new routers during the test,
# rather than only at startup like A and B above.
def addRouter(self, name, more_config) :
config = [('router', {'mode': 'interior', 'id': name}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'})
] \
+ more_config
config = Qdrouterd.Config(config)
TopologyAdditionTests.routers.append(TopologyAdditionTests.tester.qdrouterd(name, config, wait=True))
def test_01_new_route_low_cost(self):
# During the test, test code will add a new router C,
# connecting A and B with new low-cost links. At that
# point the test's messages should switch from using
# route AB to using route ACB.
# By passing both of these routes to the test, I tell
# it to expect both of them to be used.
# If it terminates with the second path remaining unused
# it will fail.
#
# Since this test alters the path that the messages follow,
# it is OK for some messages to be released rather than
# delivered. It doesn't always happen - depends on timing.
initial_expected_trace = ['0/A', '0/B']
final_expected_trace = ['0/A', '0/C', '0/B']
released_ok = True
test = AddRouter(self.A_addr,
self.B_addr,
"closest/01",
self,
'C',
self.router_C_config,
[initial_expected_trace, final_expected_trace],
released_ok
)
test.run()
self.assertIsNone(test.error)
def test_02_new_route_high_cost(self):
# During the test, test code will add a new router D,
# connecting A and B with new links. But the links are
# higher cost than what already exist. The network should
# ignore them and keep using the lowest cost route that it
# already has.
# We communicate this expectation to the test by sending
# it a single expected trace. The test will fail with
# error if any other traces are encountered.
#
# Since this test does not alter the path that the messages
# follow, it is *not* OK for any messages to be released
# rather than delivered.
only_expected_trace = ['0/A', '0/C', '0/B']
released_ok = False
test = AddRouter(self.A_addr,
self.B_addr,
"closest/02",
self,
'D',
self.router_D_config,
[only_expected_trace],
released_ok
)
test.run()
self.assertIsNone(test.error)
# ================================================================
# Tests
# ================================================================
# --------------------------------------------------------------
#
# First test
# ------------------
#
# Send some messages through the original A---B router network,
# Then change it to look like this:
#
# C
# / \
# / \
# / \
# / \
# A -------- B
#
# But the caller controls what costs are assigned to the two
# new links, so only the caller knows whether messages should
# start to flow through the new route ACB or not. It passes
# that knowledge in to us as a list of expected paths.
# This test's job is to make sure that all the expected paths
# get used by messages, and no others get used.
#
#
# Second test
# ------------------
#
# The triangular network from the first test still exists, and
# we will add to it a new router D which also connects A and B.
#
# C
# / \
# / \
# / \
# / \
# A -------- B
# \ /
# \ /
# \ /
# \ /
# D
# As in the first test, the caller tells us what routes ought
# to be followed, by putting them in the 'expected_traces' arg.
#
# --------------------------------------------------------------
class AddRouter (MessagingHandler):
def __init__(self,
send_addr,
recv_addr,
destination,
parent,
new_router_name,
new_router_config,
expected_traces,
released_ok
):
super(AddRouter, self).__init__(prefetch=100)
self.send_addr = send_addr
self.recv_addr = recv_addr
self.dest = destination
self.parent = parent
self.new_router_name = new_router_name
self.new_router_config = new_router_config
self.released_ok = released_ok
self.error = None
self.sender = None
self.receiver = None
self.n_messages = 30
self.n_sent = 0
self.n_received = 0
self.n_released = 0
self.n_accepted = 0
self.test_timer = None
self.send_timer = None
self.timeout_count = 0
self.reactor = None
self.container = None
self.finishing = False
# The parent sends us a list of the traces we
# ought to see on messages.
# Make a little data structure that
# will keep track of how many times each trace was seen.
self.expected_trace_counts = list()
for i in range(len(expected_traces)) :
self.expected_trace_counts.append([expected_traces[i], 0])
def run(self) :
Container(self).run()
# Close everything and allow the test to terminate.
def bail(self, reason_for_bailing) :
self.finishing = True
self.error = reason_for_bailing
self.receiver.close()
self.send_conn.close()
self.recv_conn.close()
self.test_timer.cancel()
self.send_timer.cancel()
# There are two timers. The 'test' timer should only expire if
# something has gone wrong, in which case it terminates the test.
# The 'send' timer expires frequently, and every time it goes off
# we send out a little batch of messages.
def timeout(self, name):
if self.finishing :
return
self.timeout_count += 1
if name == "test" :
self.bail("Timeout Expired: %d messages received, %d expected." % (self.n_received, self.n_messages))
elif name == "send" :
self.send()
self.send_timer = self.reactor.schedule(1, Timeout(self, "send"))
# At T+5, create the new router with link costs as
# specified by parent. We do it partway into the test
# so that some messages will flow through the original
# network, and some will flow through the network with
# the new router added.
if self.timeout_count == 5 :
self.parent.addRouter(self.new_router_name, self.new_router_config)
def on_start(self, event):
self.reactor = event.reactor
self.container = event.container
self.test_timer = self.reactor.schedule(TIMEOUT, Timeout(self, "test"))
self.send_timer = self.reactor.schedule(1, Timeout(self, "send"))
self.send_conn = event.container.connect(self.send_addr)
self.recv_conn = event.container.connect(self.recv_addr)
self.sender = event.container.create_sender(self.send_conn, self.dest)
self.receiver = event.container.create_receiver(self.recv_conn, self.dest)
self.receiver.flow(self.n_messages)
# ------------------------------------------------------------
# Sender Side
# ------------------------------------------------------------
def send(self):
if self.n_sent >= self.n_messages :
return
# Send little bursts of 3 messages every sender-timeout.
for _ in range(3) :
msg = Message(body=self.n_sent)
self.sender.send(msg)
self.n_sent += 1
if self.n_sent == self.n_messages :
return
# The caller of this tests decides whether it is OK or
# not OK to have some messages released during the test.
def on_released(self, event) :
if self.released_ok :
self.n_released += 1
self.check_count()
else :
self.bail("a message was released.")
def on_accepted(self, event) :
self.n_accepted += 1
self.check_count()
#
# Do the released plus the accepted messages add up to the number
# that were sent? If so, bail out with success.
# Do NOT end the test if the number is still shy of the expected
# total. The callers of this method just call it every time they
# get something -- it will be called many times poer test.
#
# Pleae Note:
# This check is on the 'sender' side of this test, rather than the
# 'receiver' side, because it is to the sender that we make a
# guarantee: namely, that the sender should know the disposition of
# all sent messages -- whether they have been accepted by the receiver,
# or released by the router network.
#
def check_count(self) :
if self.n_accepted + self.n_released == self.n_messages :
self.finishing = True
self.finish_test()
# ------------------------------------------------------------
# Receiver Side
# ------------------------------------------------------------
def on_message(self, event):
if self.finishing :
return
self.n_received += 1
trace = event.message.annotations['x-opt-qd.trace']
# Introduce flaws for debugging.
# if self.n_received == 13 :
# trace = [ '0/B', '0/A', '0/D' ]
# if self.n_received == 13 :
# self.n_received -= 1
self.record_trace(trace)
self.check_count()
# Compare the trace that came from a message to the list of
# traces the caller told us to expect. If it is one of the
# expected traces, count it. Otherwise, fail the test.
def record_trace(self, observed_trace):
for trace_record in self.expected_trace_counts :
trace = trace_record[0]
if observed_trace == trace :
trace_record[1] += 1
return
# If we get here, the trace is one we were not expecting. That's bad.
self.bail("Unexpected trace: %s" % observed_trace)
# Shut down everything and make sure that all of the extected traces
# have been seen.
def finish_test(self) :
self.test_timer.cancel()
self.send_timer.cancel()
for trace_record in self.expected_trace_counts :
count = trace_record[1]
# Deliberate flaw for debugging.
# count = 0
if count <= 0 :
self.bail("Trace %s was not seen." % trace_record[0])
return
# success
self.bail(None)
if __name__ == '__main__':
unittest.main(main_module())