blob: 32f645942e87d5445ec6d6c4fdce0f296dc314e7 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from proton import Message, Timeout
from system_test import TestCase, Qdrouterd, main_module
from system_test import unittest
from proton.handlers import MessagingHandler
from proton.reactor import Container
import time
#------------------------------------------------
# Helper classes for all tests.
#------------------------------------------------
class Timeout(object):
"""
Named timeout object can handle multiple simultaneous
timers, by telling the parent which one fired.
"""
def __init__ ( self, parent, name ):
self.parent = parent
self.name = name
def on_timer_task ( self, event ):
self.parent.timeout ( self.name )
class ManagementMessageHelper ( object ):
"""
Format management messages.
"""
def __init__ ( self, reply_addr ):
self.reply_addr = reply_addr
def make_connector_query ( self, connector_name ):
props = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name }
msg = Message ( properties=props, reply_to=self.reply_addr )
return msg
def make_connector_delete_command ( self, connector_name ):
props = {'operation': 'DELETE', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name }
msg = Message ( properties=props, reply_to=self.reply_addr )
return msg
#------------------------------------------------
# END Helper classes for all tests.
#------------------------------------------------
#================================================================
# Setup
#================================================================
class TopologyTests ( TestCase ):
@classmethod
def setUpClass(cls):
super(TopologyTests, cls).setUpClass()
def router(name, more_config):
config = [ ('router', {'mode': 'interior', 'id': name}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'})
] \
+ more_config
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
A_client_port = cls.tester.get_port()
B_client_port = cls.tester.get_port()
C_client_port = cls.tester.get_port()
D_client_port = cls.tester.get_port()
A_inter_router_port = cls.tester.get_port()
B_inter_router_port = cls.tester.get_port()
C_inter_router_port = cls.tester.get_port()
#
#
# Topology of the 4-mesh, with costs of connections marked.
# Tail of arrow indicates initiator of connection.
#
# 1
# D ----------> A
# | \ > ^
# | 20\ 50/ |
# | \ / |
# 1 | / \ | 100
# | / \ |
# v / > |
# C ----------> B
# 1
#
# Test 1 TopologyFailover Notes
#
# 1. Messages are always sent from A, and go to B.
# 2. First route ahould be ADCB.
# 3. Then we kill connector CD.
# 4. Next route should be ADB.
# 5. Then we kill connector BD.
# 6. Next route should be ACB.
# 7. Then we kill connector BC.
# 8. Final route should be AB.
cls.A_B_cost = 100
cls.A_C_cost = 50
cls.A_D_cost = 1
cls.B_C_cost = 1
cls.B_D_cost = 20
cls.C_D_cost = 1
router ( 'A',
[
( 'listener',
{ 'port': A_client_port,
'role': 'normal',
'stripAnnotations': 'no'
}
),
( 'listener',
{ 'role': 'inter-router',
'port': A_inter_router_port,
'stripAnnotations': 'no'
}
)
]
)
router ( 'B',
[
( 'listener',
{ 'port': B_client_port,
'role': 'normal',
'stripAnnotations': 'no'
}
),
( 'listener',
{ 'role': 'inter-router',
'port': B_inter_router_port,
'stripAnnotations': 'no'
}
),
( 'connector',
{ 'name': 'AB_connector',
'role': 'inter-router',
'port': A_inter_router_port,
'verifyHostname': 'no',
'cost': cls.A_B_cost,
'stripAnnotations': 'no'
}
)
]
)
router ( 'C',
[
( 'listener',
{ 'port': C_client_port,
'role': 'normal',
'stripAnnotations': 'no'
}
),
( 'listener',
{ 'role': 'inter-router',
'port': C_inter_router_port,
'stripAnnotations': 'no'
}
),
( 'connector',
{ 'name': 'AC_connector',
'role': 'inter-router',
'port': A_inter_router_port,
'verifyHostname': 'no',
'cost' : cls.A_C_cost,
'stripAnnotations': 'no'
}
),
( 'connector',
{ 'name': 'BC_connector',
'role': 'inter-router',
'port': B_inter_router_port,
'verifyHostname': 'no',
'cost' : cls.B_C_cost,
'stripAnnotations': 'no'
}
)
]
)
router ( 'D',
[
( 'listener',
{ 'port': D_client_port,
'role': 'normal',
'stripAnnotations': 'no'
}
),
( 'connector',
{ 'name': 'AD_connector',
'role': 'inter-router',
'port': A_inter_router_port,
'verifyHostname': 'no',
'cost' : cls.A_D_cost,
'stripAnnotations': 'no'
}
),
( 'connector',
{ 'name': 'BD_connector',
'role': 'inter-router',
'port': B_inter_router_port,
'verifyHostname': 'no',
'cost' : cls.B_D_cost,
'stripAnnotations': 'no'
}
),
( 'connector',
{ 'name': 'CD_connector',
'role': 'inter-router',
'port': C_inter_router_port,
'verifyHostname': 'no',
'cost' : cls.C_D_cost,
'stripAnnotations': 'no'
}
)
]
)
router_A = cls.routers[0]
router_B = cls.routers[1]
router_C = cls.routers[2]
router_D = cls.routers[3]
router_A.wait_router_connected('B')
router_A.wait_router_connected('C')
router_A.wait_router_connected('D')
cls.client_addrs = ( router_A.addresses[0],
router_B.addresses[0],
router_C.addresses[0],
router_D.addresses[0]
)
# 1 means skip that test.
cls.skip = { 'test_01' : 0
}
def test_01_topology_failover ( self ):
name = 'test_01'
if self.skip [ name ] :
self.skipTest ( "Test skipped during development." )
test = TopologyFailover ( name,
self.client_addrs,
"closest/01"
)
test.run()
self.assertEqual ( None, test.error )
#================================================================
# Tests
#================================================================
# Also see 'Test 1 TopologyFailover Notes', above.
class TopologyFailover ( MessagingHandler ):
"""
Test that the lowest-cost route is always chosen in a 4-mesh
network topology, as one link after another is lost.
This test also ensures that connections that have been
deliberately severed do no get restored.
"""
def __init__ ( self, test_name, client_addrs, destination ):
super(TopologyFailover, self).__init__(prefetch=0)
self.client_addrs = client_addrs
self.dest = destination
self.error = None
self.sender = None
self.receiver = None
self.test_timer = None
self.send_timer = None
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
self.n_released = 0
self.reactor = None
self.state = None
self.send_conn = None
self.recv_conn = None
self.nap_time = 2
self.debug = False
# Holds the management sender, receiver, and 'helper'
# associated with each router.
self.routers = {
'A' : dict(),
'B' : dict(),
'C' : dict(),
'D' : dict()
}
# These are the expectes routing traces, in the order we
# expect to receive them.
self.expected_traces = [
[u'0/A', u'0/D', u'0/C', u'0/B'],
[u'0/A', u'0/D', u'0/B'],
[u'0/A', u'0/C', u'0/B'],
[u'0/A', u'0/B']
]
self.trace_count = 0
# This tells the system in what order to kill the connectors.
self.kill_list = (
( 'D', 'CD_connector' ),
( 'D', 'BD_connector' ),
( 'C', 'BC_connector' )
)
# Use this to keep track of which connectors we have found
# when the test is first getting started and we are checking
# the topology.
self.connectors_map = { 'AB_connector' : 0,
'AC_connector' : 0,
'AD_connector' : 0,
'BC_connector' : 0,
'BD_connector' : 0,
'CD_connector' : 0
}
# The simple state machine transitions when certain events happen,
# if certain conditions are met. The conditions are checked for
# by the callbacks for the events.
# The normal sequence of states in the state machine is:
# 1. starting -- doesn't do anything
# 2. checking -- checks initial topology
# 3. examine_trace -- look at routing trace of first message
# 4. kill_connector -- kills the first connector (CD)
# 5. examine_trace -- checks routing trace of next message
# 5. kill_connector -- kills the next connector (BD)
# 5. examine_trace -- checks routing trace of next message
# 5. kill_connector -- kills the next connector (BC)
# 5. examine_trace -- checks routing trace of final message
# 5. bailing -- bails out with success
def state_transition ( self, message, new_state ) :
if self.state == new_state :
return
self.state = new_state
self.debug_print ( "state transition to : %s -- because %s" % ( self.state, message ) )
def debug_print ( self, text ) :
if self.debug == True:
print("%s %s" % (time.time(), text))
# Shut down everything and exit.
def bail ( self, text ):
self.error = text
self.send_conn.close ( )
self.recv_conn.close ( )
self.routers['B'] ['mgmt_conn'].close()
self.routers['C'] ['mgmt_conn'].close()
self.routers['D'] ['mgmt_conn'].close()
self.test_timer.cancel ( )
self.send_timer.cancel ( )
#------------------------------------------------------------------------
# I want some behavior from this test that is a little too complex
# to be governed by the usual callback functions. The way I do this
# is by making a simple state machine that checks some conditions
# during some callback, and then either steps forward or terminates
# the test.
# The callbacks that activate the state machine are mostly on_message,
# or timeout. But there are two different timers: the one-second
# timer that mostly runs the test, and the 60-second timer that, if it
# fires, will terminate the test with a timeout error.
#------------------------------------------------------------------------
def timeout ( self, name ):
if name == 'test':
self.set_state ( 'Timeout Expired', 'bailing' )
self.bail ( "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \
( self.n_sent, self.n_received, self.n_accepted ) )
elif name == 'sender':
if self.state == 'examine_trace' :
self.send ( )
self.send_timer = self.reactor.schedule(1, Timeout(self, "sender"))
def on_start ( self, event ):
self.state_transition ( 'on_start', 'starting' )
self.reactor = event.reactor
self.test_timer = event.reactor.schedule ( 60, Timeout(self, "test") )
self.send_timer = event.reactor.schedule ( 1, Timeout(self, "sender") )
self.send_conn = event.container.connect ( self.client_addrs[0] ) # A
self.recv_conn = event.container.connect ( self.client_addrs[1] ) # B
self.sender = event.container.create_sender ( self.send_conn, self.dest )
self.receiver = event.container.create_receiver ( self.recv_conn, self.dest )
self.receiver.flow ( 100 )
# I will only send management messages to B, C, and D, because
# they are the owners of the connections that I will want to delete.
self.routers['B'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[1] )
self.routers['C'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[2] )
self.routers['D'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[3] )
self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['B'] ['mgmt_conn'], dynamic=True )
self.routers['C'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['C'] ['mgmt_conn'], dynamic=True )
self.routers['D'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['D'] ['mgmt_conn'], dynamic=True )
self.routers['B'] ['mgmt_sender'] = event.container.create_sender ( self.routers['B'] ['mgmt_conn'], "$management" )
self.routers['C'] ['mgmt_sender'] = event.container.create_sender ( self.routers['C'] ['mgmt_conn'], "$management" )
self.routers['D'] ['mgmt_sender'] = event.container.create_sender ( self.routers['D'] ['mgmt_conn'], "$management" )
#-----------------------------------------------------------------
# At start-time, as the links to the three managed routers
# open, check each one to make sure that it has all the expected
# connections.
#-----------------------------------------------------------------
def on_link_opened ( self, event ) :
self.state_transition ( 'on_link_opened', 'checking' )
# The B mgmt link has opened. Check its connections. --------------------------
if event.receiver == self.routers['B'] ['mgmt_receiver'] :
event.receiver.flow ( 1000 )
self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
for connector in [ 'AB_connector' ] :
self.connector_check ( 'B', connector )
# The C mgmt link has opened. Check its connections. --------------------------
elif event.receiver == self.routers['C'] ['mgmt_receiver'] :
event.receiver.flow ( 1000 )
self.routers['C'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
for connector in [ 'AC_connector', 'BC_connector' ] :
self.connector_check ( 'C', connector )
# The D mgmt link has opened. Check its connections. --------------------------
elif event.receiver == self.routers['D'] ['mgmt_receiver']:
event.receiver.flow ( 1000 )
self.routers['D'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
for connector in [ 'AD_connector', 'BD_connector', 'CD_connector' ] :
self.connector_check ( 'D', connector )
def send ( self ):
n_sent_this_time = 0
if self.sender.credit <= 0:
self.receiver.flow ( 100 )
return
# Send messages one at a time.
if self.sender.credit > 0 :
msg = Message ( body=self.n_sent )
self.sender.send ( msg )
n_sent_this_time += 1
self.n_sent += 1
self.debug_print ( "sent: %d" % self.n_sent )
def on_message ( self, event ):
if event.receiver == self.routers['B'] ['mgmt_receiver'] or \
event.receiver == self.routers['C'] ['mgmt_receiver'] or \
event.receiver == self.routers['D'] ['mgmt_receiver'] :
#----------------------------------------------------------------
# This is a management message.
#----------------------------------------------------------------
if self.state == 'checking' :
connection_name = event.message.body['name']
if connection_name in self.connectors_map :
self.connectors_map [ connection_name ] = 1
else :
self.state_transition ( "bad connection name: %s" % connection_name, 'bailing' )
self.bail ( "bad connection name: %s" % connection_name )
n_connections = sum(self.connectors_map.values())
if n_connections == 6 :
self.state_transition ( "all %d connections found" % n_connections, 'examine_trace' )
elif self.state == 'kill_connector' :
if event.message.properties["statusDescription"] == 'No Content':
# We are in the process of killing a connector, and
# have received the response to the kill message.
self.state_transition ( 'got kill response', 'examine_trace' )
# This sleep is here because one early bug that this test found
# (and which is now fixed) involved connections that had been
# deleted coming back sometimes. It was a race and only happened
# very occasionally -- but with a pause here, after getting
# confirmation that we have successfully deleted the connector,
# the bug would show up 60 to 75% of the time. I think that leaving
# this sleep here is the only way to ensure that that particular
# bug stays fixed.
time.sleep ( self.nap_time )
else:
#----------------------------------------------------------------
# This is a payload message.
#----------------------------------------------------------------
self.n_received += 1
if self.state == 'examine_trace' :
trace = event.message.annotations [ 'x-opt-qd.trace' ]
expected = self.expected_traces [ self.trace_count ]
if trace == expected :
if self.trace_count == len(self.expected_traces) - 1 :
self.state_transition ( 'final expected trace %s observed' % expected, 'bailing' )
self.bail ( None )
return
self.state_transition ( "expected trace %d observed successfully %s" % ( self.trace_count, expected ) , 'kill_connector' )
self.kill_a_connector ( self.kill_list[self.trace_count] )
self.trace_count += 1
else :
self.state_transition ( "expected trace %s but got %s" % ( expected, trace ), 'bailing' )
self.bail ( "expected trace %s but got %s" % ( expected, trace ) )
def on_accepted ( self, event ):
self.n_accepted += 1
def on_released ( self, event ) :
self.n_released += 1
def connector_check ( self, router, connector ) :
self.debug_print ( "checking connector for router %s" % router )
mgmt_helper = self.routers[router] ['mgmt_helper']
mgmt_sender = self.routers[router] ['mgmt_sender']
msg = mgmt_helper.make_connector_query ( connector )
mgmt_sender.send ( msg )
def kill_a_connector ( self, target ) :
router = target[0]
connector = target[1]
self.debug_print ( "killing connector %s on router %s" % (connector, router) )
mgmt_helper = self.routers[router] ['mgmt_helper']
mgmt_sender = self.routers[router] ['mgmt_sender']
msg = mgmt_helper.make_connector_delete_command ( connector )
mgmt_sender.send ( msg )
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())