blob: 42c0fb8592c6d754487c87b2e75bcaa0f594ea4e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from time import sleep
import json, os
import unittest2 as unittest
import logging
from threading import Timer
from subprocess import PIPE, STDOUT
from proton import Message, Timeout, Delivery
from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR
from system_test import AsyncTestReceiver
from proton.handlers import MessagingHandler
from proton.reactor import Container, AtLeastOnce
from proton.utils import BlockingConnection
from qpid_dispatch.management.client import Node
CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451}
class TwoRouterTest(TestCase):
inter_router_port = None
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(TwoRouterTest, cls).setUpClass()
def router(name, client_server, connection):
policy_config_path = os.path.join(DIR, 'two-router-policy')
config = [
# Use the deprecated attributes helloInterval, raInterval, raIntervalFlux, remoteLsMaxAge
# The routers should still start successfully after using these deprecated entities.
('router', {'remoteLsMaxAge': 60, 'helloInterval': 1, 'raInterval': 30, 'raIntervalFlux': 4,
'mode': 'interior', 'id': 'QDR.%s'%name, 'allowUnsettledMulticast': 'yes'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'linkCapacity': 500}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'both'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
# for testing pattern matching
('address', {'pattern': 'a.b.c.d',
'distribution': 'closest'}),
('address', {'pattern': '#.b.c.d',
'distribution': 'multicast'}),
('address', {'pattern': 'a/*/#/d',
'distribution': 'closest'}),
('address', {'pattern': '*/b/c/d',
'distribution': 'multicast'}),
('address', {'pattern': 'a.x.d',
'distribution': 'closest'}),
('address', {'pattern': 'a.*.d',
'distribution': 'multicast'}),
connection
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
inter_router_port = cls.tester.get_port()
router('A', 'server',
('listener', {'role': 'inter-router', 'port': inter_router_port}))
router('B', 'client',
('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port,
'verifyHostname': 'no'}))
cls.routers[0].wait_router_connected('QDR.B')
cls.routers[1].wait_router_connected('QDR.A')
def address(self):
return self.routers[0].addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception(out if out else str(e))
return out
def test_01_pre_settled(self):
test = DeliveriesInTransit(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
# deliveriesTransit must most surely be greater than num_msgs
pos = outs.attribute_names.index("deliveriesTransit")
results = outs.results[0]
self.assertTrue(results[pos] > 104)
def test_02a_multicast_unsettled(self):
test = MulticastUnsettled(self.routers[0].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_02c_sender_settles_first(self):
test = SenderSettlesFirst(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_03_message_annotations(self):
test = MessageAnnotationsTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_no(self):
test = MessageAnnotationsStripTest(self.routers[0].addresses[1], self.routers[1].addresses[1])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_no_add_trace(self):
test = MessageAnnotationsStripAddTraceTest(self.routers[0].addresses[1], self.routers[1].addresses[1])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_both_add_ingress_trace(self):
test = MessageAnnotationsStripBothAddIngressTrace(self.routers[0].addresses[2], self.routers[1].addresses[2])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_out(self):
test = MessageAnnotationsStripMessageAnnotationsOut(self.routers[0].addresses[3], self.routers[1].addresses[3])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_in(self):
test = MessageAnnotationStripMessageAnnotationsIn(self.routers[0].addresses[4], self.routers[1].addresses[4])
test.run()
self.assertEqual(None, test.error)
def test_04_management(self):
test = ManagementTest(self.routers[0].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_06_semantics_closest_is_local(self):
test = SemanticsClosestIsLocal(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_07_semantics_closest_is_remote(self):
test = SemanticsClosestIsRemote(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_08_semantics_balanced(self):
test = SemanticsBalanced(self.routers[0].addresses[0], self.routers[0].addresses[1],
self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_09_to_override(self):
test = MessageAnnotaionsPreExistingOverride(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_10_propagated_disposition(self):
test = PropagatedDisposition(self, self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertTrue(test.passed)
def test_11_three_ack(self):
test = ThreeAck(self, self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
def test_12_excess_deliveries_released(self):
"""
Message-route a series of deliveries where the receiver provides credit for a subset and
once received, closes the link. The remaining deliveries should be released back to the sender.
"""
test = ExcessDeliveriesReleasedTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_15_attach_on_inter_router(self):
test = AttachOnInterRouterTest(self.routers[0].addresses[5])
test.run()
self.assertEqual(None, test.error)
def test_17_address_wildcard(self):
# verify proper distribution is selected by wildcard
addresses = [
# (address, count of messages expected to be received)
('a.b.c.d', 1), # closest 'a.b.c.d'
('b.c.d', 2), # multi '#.b.c.d'
('f.a.b.c.d', 2), # multi '#.b.c.d
('a.c.d', 2), # multi 'a.*.d'
('a/c/c/d', 1), # closest 'a/*/#.d
('a/x/z/z/d', 1), # closest 'a/*/#.d
('a/x/d', 1), # closest 'a.x.d'
('a.x.e', 1), # balanced ----
('m.b.c.d', 2) # multi '*/b/c/d'
]
# two receivers per address - one for each router
receivers = []
for a in addresses:
for x in range(2):
ar = AsyncTestReceiver(address=self.routers[x].addresses[0],
source=a[0])
receivers.append(ar)
# wait for the consumer info to propagate
for a in addresses:
self.routers[0].wait_address(a[0], 1, 1)
self.routers[1].wait_address(a[0], 1, 1)
# send one message to each address
conn = BlockingConnection(self.routers[0].addresses[0])
sender = conn.create_sender(address=None, options=AtLeastOnce())
for a in addresses:
sender.send(Message(address=a[0], body={'address': a[0]}))
# count received messages by address
msgs_recvd = {}
for M in receivers:
try:
while True:
i = M.queue.get(timeout=0.2).body.get('address', "ERROR")
if i not in msgs_recvd:
msgs_recvd[i] = 0
msgs_recvd[i] += 1
except AsyncTestReceiver.Empty:
pass
# verify expected count == actual count
self.assertTrue("ERROR" not in msgs_recvd)
for a in addresses:
self.assertTrue(a[0] in msgs_recvd)
self.assertEqual(a[1], msgs_recvd[a[0]])
for M in receivers:
M.stop()
conn.close()
def test_17_large_streaming_test(self):
test = LargeMessageStreamTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_18_single_char_dest_test(self):
test = SingleCharacterDestinationTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_19_delete_inter_router_connection(self):
"""
This test tries to delete an inter-router connection but is
prevented from doing so.
"""
query_command = 'QUERY --type=connection'
outputs = json.loads(self.run_qdmanage(query_command))
identity = None
passed = False
for output in outputs:
if "inter-router" == output['role']:
identity = output['identity']
if identity:
update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity
try:
json.loads(self.run_qdmanage(update_command))
except Exception as e:
if "Forbidden" in str(e):
passed = True
# The test has passed since we were forbidden from deleting
# inter-router connections even though we are allowed to update the adminStatus field.
self.assertTrue(passed)
def test_20_delete_connection(self):
"""
This test creates a blocking connection and tries to delete that connection.
Since there is no policy associated with this router, the default for allowAdminStatusUpdate is true,
the delete operation will be permitted.
"""
# Create a connection with some properties so we can easily identify the connection
connection = BlockingConnection(self.address(),
properties=CONNECTION_PROPERTIES_UNICODE_STRING)
query_command = 'QUERY --type=connection'
outputs = json.loads(self.run_qdmanage(query_command))
identity = None
passed = False
print ()
for output in outputs:
if output.get('properties'):
conn_properties = output['properties']
# Find the connection that has our properties - CONNECTION_PROPERTIES_UNICODE_STRING
# Delete that connection and run another qdmanage to see
# if the connection is gone.
if conn_properties.get('int_property'):
identity = output.get("identity")
if identity:
update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity
try:
self.run_qdmanage(update_command)
query_command = 'QUERY --type=connection'
outputs = json.loads(
self.run_qdmanage(query_command))
no_properties = True
for output in outputs:
if output.get('properties'):
no_properties = False
conn_properties = output['properties']
if conn_properties.get('int_property'):
passed = False
break
else:
passed = True
if no_properties:
passed = True
except Exception as e:
passed = False
# The test has passed since we were allowed to delete a connection
# because we have the policy permission to do so.
self.assertTrue(passed)
def test_21_delete_connection_with_receiver(self):
test = DeleteConnectionWithReceiver(self.routers[0].addresses[0])
self.assertEqual(test.error, None)
test.run()
class DeleteConnectionWithReceiver(MessagingHandler):
def __init__(self, address):
super(DeleteConnectionWithReceiver, self).__init__()
self.address = address
self.mgmt_receiver = None
self.mgmt_receiver_1 = None
self.mgmt_receiver_2 = None
self.conn_to_kill = None
self.mgmt_conn = None
self.mgmt_sender = None
self.success = False
self.error = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
# Create a receiver connection with some properties so it
# can be easily identified.
self.conn_to_kill = event.container.connect(self.address, properties=CONNECTION_PROPERTIES_UNICODE_STRING)
self.receiver_to_kill = event.container.create_receiver(self.conn_to_kill, "hello_world")
self.mgmt_conn = event.container.connect(self.address)
self.mgmt_sender = event.container.create_sender(self.mgmt_conn)
self.mgmt_receiver = event.container.create_receiver(self.mgmt_conn, None, dynamic=True)
self.mgmt_receiver_1 = event.container.create_receiver(self.mgmt_conn,
None,
dynamic=True)
self.mgmt_receiver_2 = event.container.create_receiver(self.mgmt_conn,
None,
dynamic=True)
def timeout(self):
self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
self.mgmt_conn.close()
def bail(self, error):
self.error = error
self.timer.cancel()
self.mgmt_conn.close()
self.conn_to_kill.close()
def on_link_opened(self, event):
if event.receiver == self.mgmt_receiver:
request = Message()
request.address = "amqp:/_local/$management"
request.properties = {
u'type': u'org.apache.qpid.dispatch.connection',
u'operation': u'QUERY'}
request.reply_to = self.mgmt_receiver.remote_source.address
self.mgmt_sender.send(request)
def on_message(self, event):
if event.receiver == self.mgmt_receiver:
attribute_names = event.message.body['attributeNames']
property_index = attribute_names .index('properties')
identity_index = attribute_names .index('identity')
for result in event.message.body['results']:
if result[property_index]:
properties = result[property_index]
if properties.get('int_property'):
identity = result[identity_index]
if identity:
request = Message()
request.address = "amqp:/_local/$management"
request.properties = {
u'identity': identity,
u'type': u'org.apache.qpid.dispatch.connection',
u'operation': u'UPDATE'
}
request.body = {
u'adminStatus': u'deleted'}
request.reply_to = self.mgmt_receiver_1.remote_source.address
self.mgmt_sender.send(request)
elif event.receiver == self.mgmt_receiver_1:
if event.message.properties['statusDescription'] == 'OK' and event.message.body['adminStatus'] == 'deleted':
request = Message()
request.address = "amqp:/_local/$management"
request.properties = {u'type': u'org.apache.qpid.dispatch.connection',
u'operation': u'QUERY'}
request.reply_to = self.mgmt_receiver_2.remote_source.address
self.mgmt_sender.send(request)
elif event.receiver == self.mgmt_receiver_2:
attribute_names = event.message.body['attributeNames']
property_index = attribute_names .index('properties')
identity_index = attribute_names .index('identity')
for result in event.message.body['results']:
if result[property_index]:
properties = result[property_index]
if properties and properties.get('int_property'):
self.bail("Connection not deleted")
self.bail(None)
def run(self):
Container(self).run()
class Timeout(object):
def __init__(self, parent):
self.parent = parent
def on_timer_task(self, event):
self.parent.timeout()
class SingleCharacterDestinationTest(MessagingHandler):
def __init__(self, address1, address2):
super(SingleCharacterDestinationTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "x"
self.error = None
self.conn1 = None
self.conn2 = None
self.count = 1
self.n_sent = 0
self.timer = None
self.sender = None
self.receiver = None
self.n_received = 0
self.body = "xyz"
def check_if_done(self):
if self.n_received == self.count:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def timeout(self):
self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.n_sent < self.count:
msg = Message(body=self.body)
event.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
self.n_received += 1
self.check_if_done()
def run(self):
Container(self).run()
class LargeMessageStreamTest(MessagingHandler):
def __init__(self, address1, address2):
super(LargeMessageStreamTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "LargeMessageStreamTest"
self.error = None
self.conn1 = None
self.conn2 = None
self.count = 10
self.n_sent = 0
self.timer = None
self.sender = None
self.receiver = None
self.n_received = 0
self.body = ""
for i in range(10000):
self.body += "0123456789101112131415"
def check_if_done(self):
if self.n_received == self.count:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def timeout(self):
self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.n_sent < self.count:
msg = Message(body=self.body)
# send(msg) calls the stream function which streams data from sender to the router
event.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
self.n_received += 1
self.check_if_done()
def run(self):
Container(self).run()
class ExcessDeliveriesReleasedTest(MessagingHandler):
def __init__(self, address1, address2):
super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
self.address1 = address1
self.address2 = address2
self.dest = "closest.EDRtest"
self.error = None
self.sender = None
self.receiver = None
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
self.n_released = 0
self.timer = None
self.conn1 = None
self.conn2 = None
def timeout(self):
self.error = "Timeout Expired"
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
self.receiver.flow(6)
def on_sendable(self, event):
for i in range(10 - self.n_sent):
msg = Message(body=i)
event.sender.send(msg)
self.n_sent += 1
def on_accepted(self, event):
self.n_accepted += 1
def on_released(self, event):
self.n_released += 1
if self.n_released == 4:
if self.n_accepted != 6:
self.error = "Expected 6 accepted, got %d" % self.n_accepted
if self.n_received != 6:
self.error = "Expected 6 received, got %d" % self.n_received
self.conn1.close()
self.conn2.close()
self.timer.cancel()
def on_message(self, event):
self.n_received += 1
if self.n_received == 6:
self.receiver.close()
def run(self):
Container(self).run()
class AttachOnInterRouterTest(MessagingHandler):
"""Expect an error when attaching a link to an inter-router listener"""
def __init__(self, address):
super(AttachOnInterRouterTest, self).__init__(prefetch=0)
self.address = address
self.dest = "AOIRtest"
self.error = None
self.sender = None
self.timer = None
self.conn = None
def timeout(self):
self.error = "Timeout Expired"
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn, self.dest)
def on_link_remote_close(self, event):
self.conn.close()
self.timer.cancel()
def run(self):
logging.disable(logging.ERROR) # Hide expected log errors
try:
Container(self).run()
finally:
logging.disable(logging.NOTSET) # Restore to normal
class DeliveriesInTransit(MessagingHandler):
def __init__(self, address1, address2):
super(DeliveriesInTransit, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "pre_settled.1"
self.error = "All messages not received"
self.n_sent = 0
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.num_msgs = 104
self.sent_count = 0
self.received_count = 0
self.receiver = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.n_sent <= self.num_msgs-1:
msg = Message(body="Hello World")
self.sender.send(msg)
self.n_sent += 1
def check_if_done(self):
if self.n_sent == self.received_count:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def on_message(self, event):
self.received_count+=1
self.check_if_done()
def run(self):
Container(self).run()
class MessageAnnotationsTest(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "ma/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if 0 == event.message.body['number']:
ma = event.message.annotations
if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B']:
self.error = None
self.accept(event.delivery)
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotationsStripTest(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsStripTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "message_annotations_strip_no/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
ingress_message_annotations = {'work': 'hard', 'stay': 'humble'}
msg.annotations = ingress_message_annotations
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if 0 == event.message.body['number']:
ma = event.message.annotations
if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B'] \
and ma['work'] == 'hard' and ma['stay'] == 'humble':
self.error = None
self.accept(event.delivery)
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class ManagementTest(MessagingHandler):
def __init__(self, address):
super(ManagementTest, self).__init__()
self.address = address
self.timer = None
self.conn = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
self.error = None
self.response1 = False
self.response2 = False
def timeout(self):
if not self.response1:
self.error = "Incorrect response received for message with correlation id C1"
if not self.response1:
self.error = self.error + "Incorrect response received for message with correlation id C2"
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn)
self.receiver = event.container.create_receiver(self.conn, None, dynamic=True)
def on_link_opened(self, event):
if event.receiver == self.receiver:
request = Message()
request.correlation_id = "C1"
request.address = "amqp:/_local/$management"
request.properties = {u'type': u'org.amqp.management', u'name': u'self', u'operation': u'GET-MGMT-NODES'}
request.reply_to = self.receiver.remote_source.address
self.sender.send(request)
request = Message()
request.address = "amqp:/_topo/0/QDR.B/$management"
request.correlation_id = "C2"
request.reply_to = self.receiver.remote_source.address
request.properties = {u'type': u'org.amqp.management', u'name': u'self', u'operation': u'GET-MGMT-NODES'}
self.sender.send(request)
def on_message(self, event):
if event.receiver == self.receiver:
if event.message.correlation_id == "C1":
if event.message.properties['statusCode'] == 200 and \
event.message.properties['statusDescription'] is not None \
and 'amqp:/_topo/0/QDR.B/$management' in event.message.body:
self.response1 = True
elif event.message.correlation_id == "C2":
if event.message.properties['statusCode'] == 200 and \
event.message.properties['statusDescription'] is not None \
and 'amqp:/_topo/0/QDR.A/$management' in event.message.body:
self.response2 = True
if self.response1 and self.response2:
self.error = None
if self.error is None:
self.timer.cancel()
self.conn.close()
def run(self):
Container(self).run()
class MessageAnnotationStripMessageAnnotationsIn(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationStripMessageAnnotationsIn, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "strip_message_annotations_in/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
#
# Pre-existing ingress and trace
#
ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['X/QDR']}
msg.annotations = ingress_message_annotations
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if 0 == event.message.body['number']:
if event.message.annotations['x-opt-qd.ingress'] == '0/QDR.A' \
and event.message.annotations['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B']:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotaionsPreExistingOverride(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotaionsPreExistingOverride, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "toov/1"
self.error = "Pre-existing x-opt-qd.to has been stripped"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.msg_not_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
msg.annotations = {'x-opt-qd.to': 'toov/1'}
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if 0 == event.message.body['number']:
ma = event.message.annotations
if ma['x-opt-qd.to'] == 'toov/1':
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotationsStripMessageAnnotationsOut(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsStripMessageAnnotationsOut, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "strip_message_annotations_out/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if 0 == event.message.body['number']:
if event.message.annotations is None:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotationsStripBothAddIngressTrace(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsStripBothAddIngressTrace, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "strip_message_annotations_both_add_ingress_trace/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
ingress_message_annotations = {'work': 'hard',
'x-opt-qd': 'humble',
'x-opt-qd.ingress': 'ingress-router',
'x-opt-qd.trace': ['0/QDR.A']}
msg.annotations = ingress_message_annotations
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if 0 == event.message.body['number']:
if event.message.annotations == {'work': 'hard', 'x-opt-qd': 'humble'}:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotationsStripAddTraceTest(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsStripAddTraceTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "message_annotations_strip_no/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
ingress_message_annotations = {'x-opt-qd.trace': ['0/QDR.1']}
msg.annotations = ingress_message_annotations
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if 0 == event.message.body['number']:
ma = event.message.annotations
if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.1', '0/QDR.A', '0/QDR.B']:
self.error = None
self.accept(event.delivery)
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class SenderSettlesFirst(MessagingHandler):
def __init__(self, address1, address2):
super(SenderSettlesFirst, self).__init__(auto_accept=False)
self.address1 = address1
self.address2 = address2
self.dest = "closest.senderfirst.1"
self.error = "Message body received differs from the one sent"
self.n_sent = 0
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.sent_count = 0
self.received_count = 0
self.receiver = None
self.msg_not_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
dlv = event.sender.send(msg)
dlv.settle()
self.msg_not_sent = False
def on_message(self, event):
if 0 == event.message.body['number']:
self.error = None
self.accept(event.delivery)
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MulticastUnsettled(MessagingHandler):
def __init__(self, address):
super(MulticastUnsettled, self).__init__()
self.address = address
self.dest = "multicast.2"
self.error = None
self.n_sent = 0
self.count = 3
self.n_received_a = 0
self.n_received_b = 0
self.n_received_c = 0
self.timer = None
self.conn = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
self.receiver_c = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn, self.dest)
self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")
self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
self.conn.close()
def check_if_done(self):
if self.n_received_a + self.n_received_b + self.n_received_c == self.count:
self.timer.cancel()
self.conn.close()
def on_sendable(self, event):
if self.n_sent == 0:
msg = Message(body="MulticastUnsettled-Test")
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver_a:
self.n_received_a += 1
if event.receiver == self.receiver_b:
self.n_received_b += 1
if event.receiver == self.receiver_c:
self.n_received_c += 1
def on_accepted(self, event):
self.check_if_done()
def run(self):
Container(self).run()
class SemanticsClosestIsLocal(MessagingHandler):
def __init__(self, address1, address2):
super(SemanticsClosestIsLocal, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "closest.1"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
self.receiver_c = None
self.num_messages = 100
self.n_received_a = 0
self.n_received_b = 0
self.n_received_c = 0
self.error = None
self.n_sent = 0
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
# Receiver on same router as the sender must receive all the messages. The other two
# receivers are on the other router
self.receiver_a = event.container.create_receiver(self.conn1, self.dest, name="A")
self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B")
self.receiver_c = event.container.create_receiver(self.conn2, self.dest, name="C")
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
self.conn1.close()
self.conn2.close()
def check_if_done(self):
if self.n_received_a == 100 and self.n_received_b + self.n_received_c == 0:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body="SemanticsClosestIsLocal-Test")
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver_a:
self.n_received_a += 1
if event.receiver == self.receiver_b:
self.n_received_b += 1
if event.receiver == self.receiver_c:
self.n_received_c += 1
def on_accepted(self, event):
self.check_if_done()
def run(self):
Container(self).run()
class SemanticsClosestIsRemote(MessagingHandler):
def __init__(self, address1, address2):
super(SemanticsClosestIsRemote, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "closest.1"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
self.receiver_c = None
self.num_messages = 100
self.n_received_a = 0
self.n_received_b = 0
self.error = None
self.n_sent = 0
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
# Receiver on same router as the sender must receive all the messages. The other two
# receivers are on the other router
self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A")
self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B")
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b)
self.conn1.close()
self.conn2.close()
def check_if_done(self):
if self.n_received_a + self.n_received_b == 100 and self.n_received_a > 0 and self.n_received_b > 0:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body="SemanticsClosestIsRemote-Test")
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver_a:
self.n_received_a += 1
if event.receiver == self.receiver_b:
self.n_received_b += 1
def on_accepted(self, event):
self.check_if_done()
def run(self):
Container(self).run()
class CustomTimeout(object):
def __init__(self, parent):
self.parent = parent
def addr_text(self, addr):
if not addr:
return ""
if addr[0] == 'M':
return addr[2:]
else:
return addr[1:]
def on_timer_task(self, event):
local_node = Node.connect(self.parent.address1, timeout=TIMEOUT)
res = local_node.query('org.apache.qpid.dispatch.router.address')
name = res.attribute_names.index('name')
found = False
for results in res.results:
if "balanced.1" == self.addr_text(results[name]):
found = True
break
if found:
self.parent.cancel_custom()
self.parent.create_sender(event)
else:
event.reactor.schedule(2, self)
class SemanticsBalanced(MessagingHandler):
def __init__(self, address1, address2, address3):
super(SemanticsBalanced, self).__init__(auto_accept=False, prefetch=0)
self.address1 = address1
self.address2 = address2
self.address3 = address3
self.dest = "balanced.1"
self.timer = None
self.conn1 = None
self.conn2 = None
self.conn3 = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
self.receiver_c = None
self.num_messages = 400
self.n_received_a = 0
self.n_received_b = 0
self.n_received_c = 0
self.error = None
self.n_sent = 0
self.rx_set = []
self.custom_timer = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.custom_timer = event.reactor.schedule(2, CustomTimeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.conn3 = event.container.connect(self.address3)
# This receiver is on the same router as the sender
self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A")
# These two receivers are connected to a different router than the sender
self.receiver_b = event.container.create_receiver(self.conn3, self.dest, name="B")
self.receiver_c = event.container.create_receiver(self.conn3, self.dest, name="C")
self.receiver_a.flow(300)
self.receiver_b.flow(300)
self.receiver_c.flow(300)
def cancel_custom(self):
self.custom_timer.cancel()
def create_sender(self, event):
self.sender = event.container.create_sender(self.conn1, self.dest)
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
self.conn1.close()
self.conn2.close()
self.conn3.close()
def check_if_done(self):
if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages and \
self.n_received_a > 0 and self.n_received_b > 0 and self.n_received_c > 0:
self.rx_set.sort()
all_messages_received = True
for i in range(self.num_messages):
if not i == self.rx_set[i]:
all_messages_received = False
if all_messages_received:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
self.conn3.close()
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver_a:
self.n_received_a += 1
self.rx_set.append(event.message.body['number'])
elif event.receiver == self.receiver_b:
self.n_received_b += 1
self.rx_set.append(event.message.body['number'])
elif event.receiver == self.receiver_c:
self.n_received_c += 1
self.rx_set.append(event.message.body['number'])
self.check_if_done()
def run(self):
Container(self).run()
class PropagatedDisposition(MessagingHandler):
def __init__(self, test, address1, address2):
super(PropagatedDisposition, self).__init__(auto_accept=False)
self.address1 = address1
self.address2 = address2
self.settled = []
self.test = test
self.sender_conn = None
self.receiver_conn = None
self.passed = False
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.sender_conn = event.container.connect(self.address1)
self.receiver_conn = event.container.connect(self.address2)
addr = "unsettled/2"
self.sender = event.container.create_sender(self.sender_conn, addr)
self.receiver = event.container.create_receiver(self.receiver_conn, addr)
self.receiver.flow(2)
self.trackers = {}
for b in ['accept', 'reject']:
self.trackers[self.sender.send(Message(body=b))] = b
def timeout(self):
unique_list = sorted(list(dict.fromkeys(self.settled)))
self.error = "Timeout Expired: Expected ['accept', 'reject'] got %s" % unique_list
self.sender_conn.close()
self.receiver_conn.close()
def check(self):
unique_list = sorted(list(dict.fromkeys(self.settled)))
if unique_list == [u'accept', u'reject']:
self.passed = True
self.sender_conn.close()
self.receiver_conn.close()
self.timer.cancel()
def on_message(self, event):
if event.message.body == u'accept':
event.delivery.update(Delivery.ACCEPTED)
event.delivery.settle()
elif event.message.body == u'reject':
event.delivery.update(Delivery.REJECTED)
event.delivery.settle()
def on_accepted(self, event):
self.test.assertEqual(Delivery.ACCEPTED, event.delivery.remote_state)
self.test.assertEqual('accept', self.trackers[event.delivery])
self.settled.append('accept')
self.check()
def on_rejected(self, event):
self.test.assertEqual(Delivery.REJECTED, event.delivery.remote_state)
self.test.assertEqual('reject', self.trackers[event.delivery])
self.settled.append('reject')
self.check()
def run(self):
Container(self).run()
class ThreeAck(MessagingHandler):
def __init__(self, test, address1, address2):
super(ThreeAck, self).__init__(auto_accept=False, auto_settle=False)
self.addrs = [address1, address2]
self.settled = []
self.test = test
self.phase = 0
def on_start(self, event):
connections = [event.container.connect(a) for a in self.addrs]
addr = "three_ack/1"
self.sender = event.container.create_sender(connections[0], addr)
self.receiver = event.container.create_receiver(connections[1], addr)
self.receiver.flow(1)
self.tracker = self.sender.send(Message('hello'))
def on_message(self, event):
self.test.assertEqual(0, self.phase)
self.phase = 1
self.test.assertFalse(event.delivery.settled)
self.test.assertEqual(0, self.tracker.local_state)
self.test.assertEqual(0, self.tracker.remote_state)
event.delivery.update(Delivery.ACCEPTED)
# NOTE: we don't settle yet for 3-ack
def on_accepted(self, event):
self.test.assertTrue(event.sender)
self.test.assertEqual(1, self.phase)
self.phase = 2
self.test.assertEqual(Delivery.ACCEPTED, event.delivery.remote_state)
self.test.assertFalse(event.delivery.settled)
self.test.assertEqual(0, event.delivery.local_state)
event.delivery.settle()
self.test.assertFalse(event.delivery.settled)
event.connection.close()
def on_settled(self, event):
self.test.assertTrue(event.receiver)
self.test.assertEqual(2, self.phase)
self.phase = 3
event.connection.close()
def run(self):
Container(self).run()
self.test.assertEqual(3, self.phase)
class TwoRouterConnection(TestCase):
def __init__(self, test_method):
TestCase.__init__(self, test_method)
self.success = False
self.timer_delay = 4
self.max_attempts = 2
self.attempts = 0
self.local_node = None
@classmethod
def router(cls, name, config):
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
@classmethod
def setUpClass(cls):
super(TwoRouterConnection, cls).setUpClass()
cls.routers = []
cls.B_normal_port_1 = cls.tester.get_port()
cls.B_normal_port_2 = cls.tester.get_port()
TwoRouterConnection.router('A', [
('router', {'mode': 'interior', 'id': 'A'}),
('listener', {'host': '0.0.0.0', 'role': 'normal',
'port': cls.tester.get_port()}),
]
)
TwoRouterConnection.router('B',
[
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'host': '0.0.0.0', 'role': 'normal',
'port': cls.B_normal_port_1}),
('listener', {'host': '0.0.0.0', 'role': 'normal',
'port': cls.B_normal_port_2}),
]
)
def address(self):
return self.routers[0].addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception(out if out else str(e))
return out
def can_terminate(self):
if self.attempts == self.max_attempts:
return True
if self.success:
return True
return False
def check_connections(self):
res = self.local_node.query(type='org.apache.qpid.dispatch.connection')
results = res.results
# If DISPATCH-1093 was not fixed, there would be an additional
# connection created and hence the len(results) would be 4
# Since DISPATCH-1093 is fixed, len(results would be 3 which is what
# we would expect.
if len(results) != 3:
self.schedule_num_connections_test()
else:
self.success = True
def schedule_num_connections_test(self):
if self.attempts < self.max_attempts:
if not self.success:
Timer(self.timer_delay, self.check_connections).start()
self.attempts += 1
def test_create_connectors(self):
self.local_node = Node.connect(self.routers[0].addresses[0],
timeout=TIMEOUT)
res = self.local_node.query(type='org.apache.qpid.dispatch.connection')
results = res.results
self.assertEqual(1, len(results))
long_type = 'org.apache.qpid.dispatch.connector' ''
create_command = 'CREATE --type=' + long_type + ' --name=foo' + ' host=0.0.0.0 port=' + str(TwoRouterConnection.B_normal_port_1)
self.run_qdmanage(create_command)
create_command = 'CREATE --type=' + long_type + ' --name=bar' + ' host=0.0.0.0 port=' + str(TwoRouterConnection.B_normal_port_2)
self.run_qdmanage(create_command)
self.schedule_num_connections_test()
while not self.can_terminate():
pass
self.assertTrue(self.success)
class PropagationTest(TestCase):
inter_router_port = None
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(PropagationTest, cls).setUpClass()
def router(name, extra_config):
config = [
('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
('listener', {'port': cls.tester.get_port()}),
] + extra_config
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
inter_router_port = cls.tester.get_port()
router('A', [('listener', {'role': 'inter-router', 'port': inter_router_port}), ('address', {'prefix': 'multicast', 'distribution': 'multicast'})])
router('B', [('connector', {'role': 'inter-router', 'port': inter_router_port})])
cls.routers[0].wait_router_connected('QDR.B')
cls.routers[1].wait_router_connected('QDR.A')
def test_propagation_of_locally_undefined_address(self):
test = MulticastTestClient(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
self.assertEqual(test.received, 2)
class CreateReceiver(MessagingHandler):
def __init__(self, connection, address):
super(CreateReceiver, self).__init__()
self.connection = connection
self.address = address
def on_timer_task(self, event):
event.container.create_receiver(self.connection, self.address)
class DelayedSend(MessagingHandler):
def __init__(self, connection, address, message):
super(DelayedSend, self).__init__()
self.connection = connection
self.address = address
self.message = message
def on_timer_task(self, event):
event.container.create_sender(self.connection, self.address).send(self.message)
class MulticastTestClient(MessagingHandler):
def __init__(self, router1, router2):
super(MulticastTestClient, self).__init__()
self.routers = [router1, router2]
self.received = 0
self.error = None
def on_start(self, event):
self.connections = [event.container.connect(r) for r in self.routers]
event.container.create_receiver(self.connections[0], "multicast")
# wait for knowledge of receiver1 to propagate to second router
event.container.schedule(5, CreateReceiver(self.connections[1], "multicast"))
event.container.schedule(7, DelayedSend(self.connections[1], "multicast", Message(body="testing1,2,3")))
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
def on_message(self, event):
self.received += 1
event.connection.close()
if self.received == 2:
self.timer.cancel()
def timeout(self):
self.error = "Timeout Expired:received=%d" % self.received
for c in self.connections:
c.close()
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())