blob: c2aec27124ff50dc6dfa6b0b543bbc2272b75208 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from time import sleep
import json, os
import logging
from threading import Timer
from subprocess import PIPE, STDOUT
from proton import Message, Timeout, Delivery
from system_test import Logger, TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR, TestTimeout
from system_test import AsyncTestReceiver
from system_test import AsyncTestSender
from system_test import get_inter_router_links
from system_test import unittest
from test_broker import FakeService
from proton.handlers import MessagingHandler
from proton.reactor import Container, AtLeastOnce
from proton.utils import BlockingConnection
from qpid_dispatch.management.client import Node
CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451}
class TwoRouterTest(TestCase):
inter_router_port = None
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(TwoRouterTest, cls).setUpClass()
def router(name, client_server, connection):
policy_config_path = os.path.join(DIR, 'two-router-policy')
config = [
# Use the deprecated attributes helloInterval, raInterval, raIntervalFlux, remoteLsMaxAge
# The routers should still start successfully after using these deprecated entities.
('router', {'remoteLsMaxAge': 60, 'helloInterval': 1, 'raInterval': 30, 'raIntervalFlux': 4,
'mode': 'interior', 'id': 'QDR.%s'%name, 'allowUnsettledMulticast': 'yes'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'linkCapacity': 500}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'both'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
# for testing pattern matching
('address', {'pattern': 'a.b.c.d',
'distribution': 'closest'}),
('address', {'pattern': '#.b.c.d',
'distribution': 'multicast'}),
('address', {'pattern': 'a/*/#/d',
'distribution': 'closest'}),
('address', {'pattern': '*/b/c/d',
'distribution': 'multicast'}),
('address', {'pattern': 'a.x.d',
'distribution': 'closest'}),
('address', {'pattern': 'a.*.d',
'distribution': 'multicast'}),
connection
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
inter_router_port = cls.tester.get_port()
router('A', 'server',
('listener', {'role': 'inter-router', 'port': inter_router_port}))
router('B', 'client',
('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}))
cls.routers[0].wait_router_connected('QDR.B')
cls.routers[1].wait_router_connected('QDR.A')
def address(self):
return self.routers[0].addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception(out if out else str(e))
return out
def test_01_pre_settled(self):
test = DeliveriesInTransit(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
# deliveriesTransit must most surely be greater than num_msgs
pos = outs.attribute_names.index("deliveriesTransit")
results = outs.results[0]
self.assertTrue(results[pos] > 104)
def test_02a_multicast_unsettled(self):
test = MulticastUnsettled(self.routers[0].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_02c_sender_settles_first(self):
test = SenderSettlesFirst(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_03_message_annotations(self):
test = MessageAnnotationsTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_no(self):
test = MessageAnnotationsStripTest(self.routers[0].addresses[1], self.routers[1].addresses[1])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_no_add_trace(self):
test = MessageAnnotationsStripAddTraceTest(self.routers[0].addresses[1], self.routers[1].addresses[1])
test.run()
# Dump the logger output only if there is a test error, otherwise dont bother
if test.error:
test.logger.dump()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_both_add_ingress_trace(self):
test = MessageAnnotationsStripBothAddIngressTrace(self.routers[0].addresses[2], self.routers[1].addresses[2])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_out(self):
test = MessageAnnotationsStripMessageAnnotationsOut(self.routers[0].addresses[3], self.routers[1].addresses[3])
test.run()
self.assertEqual(None, test.error)
def test_03a_test_strip_message_annotations_in(self):
test = MessageAnnotationStripMessageAnnotationsIn(self.routers[0].addresses[4], self.routers[1].addresses[4])
test.run()
self.assertEqual(None, test.error)
def test_04_management(self):
test = ManagementTest(self.routers[0].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_06_semantics_closest_is_local(self):
test = SemanticsClosestIsLocal(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_07_semantics_closest_is_remote(self):
test = SemanticsClosestIsRemote(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_08_semantics_balanced(self):
test = SemanticsBalanced(self.routers[0].addresses[0], self.routers[0].addresses[1],
self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_09_to_override(self):
test = MessageAnnotaionsPreExistingOverride(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_10_propagated_disposition(self):
test = PropagatedDisposition(self, self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertTrue(test.passed)
def test_11_three_ack(self):
test = ThreeAck(self, self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
def test_12_excess_deliveries_released(self):
"""
Message-route a series of deliveries where the receiver provides credit for a subset and
once received, closes the link. The remaining deliveries should be released back to the sender.
"""
test = ExcessDeliveriesReleasedTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_15_attach_on_inter_router(self):
test = AttachOnInterRouterTest(self.routers[0].addresses[5])
test.run()
self.assertEqual(None, test.error)
def test_17_address_wildcard(self):
# verify proper distribution is selected by wildcard
addresses = [
# (address, count of messages expected to be received)
('a.b.c.d', 1), # closest 'a.b.c.d'
('b.c.d', 2), # multi '#.b.c.d'
('f.a.b.c.d', 2), # multi '#.b.c.d
('a.c.d', 2), # multi 'a.*.d'
('a/c/c/d', 1), # closest 'a/*/#.d
('a/x/z/z/d', 1), # closest 'a/*/#.d
('a/x/d', 1), # closest 'a.x.d'
('a.x.e', 1), # balanced ----
('m.b.c.d', 2) # multi '*/b/c/d'
]
# two receivers per address - one for each router
receivers = []
for a in addresses:
for x in range(2):
ar = AsyncTestReceiver(address=self.routers[x].addresses[0],
source=a[0])
receivers.append(ar)
# wait for the consumer info to propagate
for a in addresses:
self.routers[0].wait_address(a[0], 1, 1)
self.routers[1].wait_address(a[0], 1, 1)
# send one message to each address
conn = BlockingConnection(self.routers[0].addresses[0])
sender = conn.create_sender(address=None, options=AtLeastOnce())
for a in addresses:
sender.send(Message(address=a[0], body={'address': a[0]}))
# count received messages by address
msgs_recvd = {}
for M in receivers:
try:
while True:
i = M.queue.get(timeout=0.2).body.get('address', "ERROR")
if i not in msgs_recvd:
msgs_recvd[i] = 0
msgs_recvd[i] += 1
except AsyncTestReceiver.Empty:
pass
# verify expected count == actual count
self.assertTrue("ERROR" not in msgs_recvd)
for a in addresses:
self.assertTrue(a[0] in msgs_recvd)
self.assertEqual(a[1], msgs_recvd[a[0]])
for M in receivers:
M.stop()
conn.close()
def test_17_large_streaming_test(self):
test = LargeMessageStreamTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_18_single_char_dest_test(self):
test = SingleCharacterDestinationTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
def test_19_delete_inter_router_connection(self):
"""
This test tries to delete an inter-router connection but is
prevented from doing so.
"""
query_command = 'QUERY --type=connection'
outputs = json.loads(self.run_qdmanage(query_command))
identity = None
passed = False
for output in outputs:
if "inter-router" == output['role']:
identity = output['identity']
if identity:
update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity
try:
json.loads(self.run_qdmanage(update_command))
except Exception as e:
if "Forbidden" in str(e):
passed = True
# The test has passed since we were forbidden from deleting
# inter-router connections even though we are allowed to update the adminStatus field.
self.assertTrue(passed)
def test_20_delete_connection(self):
"""
This test creates a blocking connection and tries to delete that connection.
Since there is no policy associated with this router, the default for allowAdminStatusUpdate is true,
the delete operation will be permitted.
"""
# Create a connection with some properties so we can easily identify the connection
connection = BlockingConnection(self.address(),
properties=CONNECTION_PROPERTIES_UNICODE_STRING)
query_command = 'QUERY --type=connection'
outputs = json.loads(self.run_qdmanage(query_command))
identity = None
passed = False
print ()
for output in outputs:
if output.get('properties'):
conn_properties = output['properties']
# Find the connection that has our properties - CONNECTION_PROPERTIES_UNICODE_STRING
# Delete that connection and run another qdmanage to see
# if the connection is gone.
if conn_properties.get('int_property'):
identity = output.get("identity")
if identity:
update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity
try:
self.run_qdmanage(update_command)
query_command = 'QUERY --type=connection'
outputs = json.loads(
self.run_qdmanage(query_command))
no_properties = True
for output in outputs:
if output.get('properties'):
no_properties = False
conn_properties = output['properties']
if conn_properties.get('int_property'):
passed = False
break
else:
passed = True
if no_properties:
passed = True
except Exception as e:
passed = False
# The test has passed since we were allowed to delete a connection
# because we have the policy permission to do so.
self.assertTrue(passed)
def test_21_delete_connection_with_receiver(self):
test = DeleteConnectionWithReceiver(self.routers[0].addresses[0])
self.assertEqual(test.error, None)
test.run()
def test_30_huge_address(self):
# try a link with an extremely long address
# DISPATCH-1461
addr = "A" * 2019
rx = AsyncTestReceiver(self.routers[0].addresses[0],
source=addr)
tx = AsyncTestSender(self.routers[1].addresses[0],
target=addr,
count=100)
tx.wait()
i = 100
while i:
try:
rx.queue.get(timeout=TIMEOUT)
i -= 1
except AsyncTestReceiver.Empty:
break;
self.assertEqual(0, i)
rx.stop()
class DeleteConnectionWithReceiver(MessagingHandler):
def __init__(self, address):
super(DeleteConnectionWithReceiver, self).__init__()
self.address = address
self.mgmt_receiver = None
self.mgmt_receiver_1 = None
self.mgmt_receiver_2 = None
self.conn_to_kill = None
self.mgmt_conn = None
self.mgmt_sender = None
self.success = False
self.error = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
# Create a receiver connection with some properties so it
# can be easily identified.
self.conn_to_kill = event.container.connect(self.address, properties=CONNECTION_PROPERTIES_UNICODE_STRING)
self.receiver_to_kill = event.container.create_receiver(self.conn_to_kill, "hello_world")
self.mgmt_conn = event.container.connect(self.address)
self.mgmt_sender = event.container.create_sender(self.mgmt_conn)
self.mgmt_receiver = event.container.create_receiver(self.mgmt_conn, None, dynamic=True)
self.mgmt_receiver_1 = event.container.create_receiver(self.mgmt_conn,
None,
dynamic=True)
self.mgmt_receiver_2 = event.container.create_receiver(self.mgmt_conn,
None,
dynamic=True)
def timeout(self):
self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
self.mgmt_conn.close()
def bail(self, error):
self.error = error
self.timer.cancel()
self.mgmt_conn.close()
self.conn_to_kill.close()
def on_link_opened(self, event):
if event.receiver == self.mgmt_receiver:
request = Message()
request.address = "amqp:/_local/$management"
request.properties = {
u'type': u'org.apache.qpid.dispatch.connection',
u'operation': u'QUERY'}
request.reply_to = self.mgmt_receiver.remote_source.address
self.mgmt_sender.send(request)
def on_message(self, event):
if event.receiver == self.mgmt_receiver:
attribute_names = event.message.body['attributeNames']
property_index = attribute_names .index('properties')
identity_index = attribute_names .index('identity')
for result in event.message.body['results']:
if result[property_index]:
properties = result[property_index]
if properties.get('int_property'):
identity = result[identity_index]
if identity:
request = Message()
request.address = "amqp:/_local/$management"
request.properties = {
u'identity': identity,
u'type': u'org.apache.qpid.dispatch.connection',
u'operation': u'UPDATE'
}
request.body = {
u'adminStatus': u'deleted'}
request.reply_to = self.mgmt_receiver_1.remote_source.address
self.mgmt_sender.send(request)
elif event.receiver == self.mgmt_receiver_1:
if event.message.properties['statusDescription'] == 'OK' and event.message.body['adminStatus'] == 'deleted':
request = Message()
request.address = "amqp:/_local/$management"
request.properties = {u'type': u'org.apache.qpid.dispatch.connection',
u'operation': u'QUERY'}
request.reply_to = self.mgmt_receiver_2.remote_source.address
self.mgmt_sender.send(request)
elif event.receiver == self.mgmt_receiver_2:
attribute_names = event.message.body['attributeNames']
property_index = attribute_names .index('properties')
identity_index = attribute_names .index('identity')
for result in event.message.body['results']:
if result[property_index]:
properties = result[property_index]
if properties and properties.get('int_property'):
self.bail("Connection not deleted")
self.bail(None)
def run(self):
Container(self).run()
class SingleCharacterDestinationTest(MessagingHandler):
def __init__(self, address1, address2):
super(SingleCharacterDestinationTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "x"
self.error = None
self.conn1 = None
self.conn2 = None
self.count = 1
self.n_sent = 0
self.timer = None
self.sender = None
self.receiver = None
self.n_received = 0
self.body = "xyz"
def check_if_done(self):
if self.n_received == self.count:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def timeout(self):
self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.n_sent < self.count:
msg = Message(body=self.body)
event.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
self.n_received += 1
self.check_if_done()
def run(self):
Container(self).run()
class LargeMessageStreamTest(MessagingHandler):
def __init__(self, address1, address2):
super(LargeMessageStreamTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "LargeMessageStreamTest"
self.error = None
self.conn1 = None
self.conn2 = None
self.count = 10
self.n_sent = 0
self.timer = None
self.sender = None
self.receiver = None
self.n_received = 0
self.body = ""
for i in range(10000):
self.body += "0123456789101112131415"
def check_if_done(self):
if self.n_received == self.count:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def timeout(self):
self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.n_sent < self.count:
msg = Message(body=self.body)
# send(msg) calls the stream function which streams data from sender to the router
event.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
self.n_received += 1
self.check_if_done()
def run(self):
Container(self).run()
class ExcessDeliveriesReleasedTest(MessagingHandler):
def __init__(self, address1, address2):
super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
self.address1 = address1
self.address2 = address2
self.dest = "closest.EDRtest"
self.error = None
self.sender = None
self.receiver = None
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
self.n_released = 0
self.timer = None
self.conn1 = None
self.conn2 = None
def timeout(self):
self.error = "Timeout Expired"
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
self.receiver.flow(6)
def on_sendable(self, event):
for i in range(10 - self.n_sent):
msg = Message(body=i)
event.sender.send(msg)
self.n_sent += 1
def on_accepted(self, event):
self.n_accepted += 1
def on_released(self, event):
self.n_released += 1
if self.n_released == 4:
if self.n_accepted != 6:
self.error = "Expected 6 accepted, got %d" % self.n_accepted
if self.n_received != 6:
self.error = "Expected 6 received, got %d" % self.n_received
self.conn1.close()
self.conn2.close()
self.timer.cancel()
def on_message(self, event):
self.n_received += 1
if self.n_received == 6:
self.receiver.close()
def run(self):
Container(self).run()
class AttachOnInterRouterTest(MessagingHandler):
"""Expect an error when attaching a link to an inter-router listener"""
def __init__(self, address):
super(AttachOnInterRouterTest, self).__init__(prefetch=0)
self.address = address
self.dest = "AOIRtest"
self.error = None
self.sender = None
self.timer = None
self.conn = None
def timeout(self):
self.error = "Timeout Expired"
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn, self.dest)
def on_link_remote_close(self, event):
self.conn.close()
self.timer.cancel()
def run(self):
logging.disable(logging.ERROR) # Hide expected log errors
try:
Container(self).run()
finally:
logging.disable(logging.NOTSET) # Restore to normal
class DeliveriesInTransit(MessagingHandler):
def __init__(self, address1, address2):
super(DeliveriesInTransit, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "pre_settled.1"
self.error = "All messages not received"
self.n_sent = 0
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.num_msgs = 104
self.sent_count = 0
self.received_count = 0
self.receiver = None
def timeout(self):
self.error = "Timeout Expired: n_sent=%d n_received_count=%d" % (self.n_sent, self.received_count)
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_sendable(self, event):
if self.n_sent <= self.num_msgs-1:
msg = Message(body="Hello World")
self.sender.send(msg)
self.n_sent += 1
def check_if_done(self):
if self.n_sent == self.received_count:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def on_message(self, event):
self.received_count+=1
self.check_if_done()
def run(self):
Container(self).run()
class MessageAnnotationsTest(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "ma/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def timeout(self):
self.error = "Timeout Expired: " + self.error
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
def on_sendable(self, event):
if self.msg_not_sent:
msg = Message(body={'number': 0})
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if event.receiver == self.receiver:
if 0 == event.message.body['number']:
ma = event.message.annotations
if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B']:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotationsStripTest(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsStripTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "message_annotations_strip_no/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def timeout(self):
self.error = "Timeout Expired: " + self.error
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
def on_sendable(self, event):
if event.sender == self.sender:
if self.msg_not_sent:
msg = Message(body={'number': 0})
ingress_message_annotations = {'work': 'hard', 'stay': 'humble'}
msg.annotations = ingress_message_annotations
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if event.receiver == self.receiver:
if 0 == event.message.body['number']:
ma = event.message.annotations
if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B'] \
and ma['work'] == 'hard' and ma['stay'] == 'humble':
self.error = None
self.accept(event.delivery)
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class ManagementTest(MessagingHandler):
def __init__(self, address):
super(ManagementTest, self).__init__()
self.address = address
self.timer = None
self.conn = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
self.error = None
self.response1 = False
self.response2 = False
def timeout(self):
if not self.response1:
self.error = "Incorrect response received for message with correlation id C1"
if not self.response1:
self.error = self.error + "Incorrect response received for message with correlation id C2"
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn)
self.receiver = event.container.create_receiver(self.conn, None, dynamic=True)
def on_link_opened(self, event):
if event.receiver == self.receiver:
request = Message()
request.correlation_id = "C1"
request.address = "amqp:/_local/$management"
request.properties = {u'type': u'org.amqp.management', u'name': u'self', u'operation': u'GET-MGMT-NODES'}
request.reply_to = self.receiver.remote_source.address
self.sender.send(request)
request = Message()
request.address = "amqp:/_topo/0/QDR.B/$management"
request.correlation_id = "C2"
request.reply_to = self.receiver.remote_source.address
request.properties = {u'type': u'org.amqp.management', u'name': u'self', u'operation': u'GET-MGMT-NODES'}
self.sender.send(request)
def on_message(self, event):
if event.receiver == self.receiver:
if event.message.correlation_id == "C1":
if event.message.properties['statusCode'] == 200 and \
event.message.properties['statusDescription'] is not None \
and 'amqp:/_topo/0/QDR.B/$management' in event.message.body:
self.response1 = True
elif event.message.correlation_id == "C2":
if event.message.properties['statusCode'] == 200 and \
event.message.properties['statusDescription'] is not None \
and 'amqp:/_topo/0/QDR.A/$management' in event.message.body:
self.response2 = True
if self.response1 and self.response2:
self.error = None
if self.error is None:
self.timer.cancel()
self.conn.close()
def run(self):
Container(self).run()
class MessageAnnotationStripMessageAnnotationsIn(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationStripMessageAnnotationsIn, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "strip_message_annotations_in/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def timeout(self):
self.error = "Timeout Expired: " + self.error
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
def on_sendable(self, event):
if event.sender == self.sender:
if self.msg_not_sent:
msg = Message(body={'number': 0})
#
# Pre-existing ingress and trace
#
ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['X/QDR']}
msg.annotations = ingress_message_annotations
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if event.receiver == self.receiver:
if 0 == event.message.body['number']:
if event.message.annotations['x-opt-qd.ingress'] == '0/QDR.A' \
and event.message.annotations['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B']:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotaionsPreExistingOverride(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotaionsPreExistingOverride, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "toov/1"
self.error = "Pre-existing x-opt-qd.to has been stripped"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.msg_not_sent = True
def timeout(self):
self.error = "Timeout Expired: " + self.error
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
def on_sendable(self, event):
if event.sender == self.sender:
if self.msg_not_sent:
msg = Message(body={'number': 0})
msg.annotations = {'x-opt-qd.to': 'toov/1'}
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if event.receiver == self.receiver:
if 0 == event.message.body['number']:
ma = event.message.annotations
if ma['x-opt-qd.to'] == 'toov/1':
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotationsStripMessageAnnotationsOut(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsStripMessageAnnotationsOut, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "strip_message_annotations_out/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def timeout(self):
self.error = "Timeout Expired: " + self.error
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
def on_sendable(self, event):
if event.sender == self.sender:
if self.msg_not_sent:
msg = Message(body={'number': 0})
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if event.receiver == self.receiver:
if 0 == event.message.body['number']:
if event.message.annotations is None:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotationsStripBothAddIngressTrace(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsStripBothAddIngressTrace, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "strip_message_annotations_both_add_ingress_trace/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
def timeout(self):
self.error = "Timeout Expired: " + self.error
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
def on_sendable(self, event):
if event.sender == self.sender:
if self.msg_not_sent:
msg = Message(body={'number': 0})
ingress_message_annotations = {'work': 'hard',
'x-opt-qd': 'humble',
'x-opt-qd.ingress': 'ingress-router',
'x-opt-qd.trace': ['0/QDR.A']}
msg.annotations = ingress_message_annotations
event.sender.send(msg)
self.msg_not_sent = False
def on_message(self, event):
if self.receiver == event.receiver:
if 0 == event.message.body['number']:
if event.message.annotations == {'work': 'hard', 'x-opt-qd': 'humble'}:
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MessageAnnotationsStripAddTraceTest(MessagingHandler):
def __init__(self, address1, address2):
super(MessageAnnotationsStripAddTraceTest, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "message_annotations_strip_no/1"
self.error = "Message annotations not found"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.msg_not_sent = True
self.logger = Logger(title="MessageAnnotationsStripAddTraceTest")
def timeout(self):
self.error = "Timeout Expired: " + self.error
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
self.logger.log("on_start(): Receiver link created on self.conn2")
def on_sendable(self, event):
if event.sender == self.sender:
if self.msg_not_sent:
msg = Message(body={'number': 0})
ingress_message_annotations = {'x-opt-qd.trace': ['0/QDR.1']}
msg.annotations = ingress_message_annotations
event.sender.send(msg)
self.msg_not_sent = False
self.logger.log("on_sendable(): Message sent")
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
self.logger.log("on_link_opened(): Sender link created on self.conn1")
def on_message(self, event):
if event.receiver == self.receiver:
self.logger.log("on_message(): Message received by receiver")
if 0 == event.message.body['number']:
self.logger.log("on_message(): Message received by receiver body matches expected body")
ma = event.message.annotations
if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.1', '0/QDR.A', '0/QDR.B']:
self.logger.log("on_message(): Message annotations in message match expected message annotations. Success...")
self.error = None
self.timer.cancel()
self.conn1.close()
self.conn2.close()
else:
self.logger.log("on_message(): Message received by receiver, message annotations are not as expected")
self.logger.log(ma)
else:
self.logger.log("on_message(): Message received by receiver but body is not expected")
def run(self):
Container(self).run()
class SenderSettlesFirst(MessagingHandler):
def __init__(self, address1, address2):
super(SenderSettlesFirst, self).__init__(auto_accept=False)
self.address1 = address1
self.address2 = address2
self.dest = "closest.senderfirst.1"
self.error = "Message body received differs from the one sent"
self.n_sent = 0
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.sent_count = 0
self.received_count = 0
self.receiver = None
self.msg_not_sent = True
def timeout(self):
self.error = "Timeout Expired: " + self.error
self.conn1.close()
self.conn2.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn2 = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn2, self.dest)
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.conn1 = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn1, self.dest)
def on_sendable(self, event):
if event.sender == self.sender:
if self.msg_not_sent:
msg = Message(body={'number': 0})
dlv = event.sender.send(msg)
dlv.settle()
self.msg_not_sent = False
def on_message(self, event):
if event.receiver == self.receiver:
if 0 == event.message.body['number']:
self.error = None
self.accept(event.delivery)
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def run(self):
Container(self).run()
class MulticastUnsettled(MessagingHandler):
def __init__(self, address):
super(MulticastUnsettled, self).__init__()
self.address = address
self.dest = "multicast.2"
self.error = None
self.n_sent = 0
self.count = 3
self.n_received_a = 0
self.n_received_b = 0
self.n_received_c = 0
self.timer = None
self.conn = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
self.receiver_c = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn, self.dest)
self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")
self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
self.conn.close()
def check_if_done(self):
if self.n_received_a + self.n_received_b + self.n_received_c == self.count:
self.timer.cancel()
self.conn.close()
def on_sendable(self, event):
if self.n_sent == 0:
msg = Message(body="MulticastUnsettled-Test")
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver_a:
self.n_received_a += 1
if event.receiver == self.receiver_b:
self.n_received_b += 1
if event.receiver == self.receiver_c:
self.n_received_c += 1
def on_accepted(self, event):
self.check_if_done()
def run(self):
Container(self).run()
class SemanticsClosestIsLocal(MessagingHandler):
def __init__(self, address1, address2):
super(SemanticsClosestIsLocal, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "closest.1"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
self.receiver_c = None
self.num_messages = 100
self.n_received_a = 0
self.n_received_b = 0
self.n_received_c = 0
self.error = None
self.n_sent = 0
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
# Receiver on same router as the sender must receive all the messages. The other two
# receivers are on the other router
self.receiver_a = event.container.create_receiver(self.conn1, self.dest, name="A")
self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B")
self.receiver_c = event.container.create_receiver(self.conn2, self.dest, name="C")
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
self.conn1.close()
self.conn2.close()
def check_if_done(self):
if self.n_received_a == 100 and self.n_received_b + self.n_received_c == 0:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body="SemanticsClosestIsLocal-Test")
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver_a:
self.n_received_a += 1
if event.receiver == self.receiver_b:
self.n_received_b += 1
if event.receiver == self.receiver_c:
self.n_received_c += 1
def on_accepted(self, event):
self.check_if_done()
def run(self):
Container(self).run()
class SemanticsClosestIsRemote(MessagingHandler):
def __init__(self, address1, address2):
super(SemanticsClosestIsRemote, self).__init__()
self.address1 = address1
self.address2 = address2
self.dest = "closest.1"
self.timer = None
self.conn1 = None
self.conn2 = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
self.receiver_c = None
self.num_messages = 100
self.n_received_a = 0
self.n_received_b = 0
self.error = None
self.n_sent = 0
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
# Receiver on same router as the sender must receive all the messages. The other two
# receivers are on the other router
self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A")
self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B")
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b)
self.conn1.close()
self.conn2.close()
def check_if_done(self):
if self.n_received_a + self.n_received_b == 100 and self.n_received_a > 0 and self.n_received_b > 0:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body="SemanticsClosestIsRemote-Test")
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver_a:
self.n_received_a += 1
if event.receiver == self.receiver_b:
self.n_received_b += 1
def on_accepted(self, event):
self.check_if_done()
def run(self):
Container(self).run()
class CustomTimeout(object):
def __init__(self, parent):
self.parent = parent
def addr_text(self, addr):
if not addr:
return ""
if addr[0] == 'M':
return addr[2:]
else:
return addr[1:]
def on_timer_task(self, event):
local_node = Node.connect(self.parent.address1, timeout=TIMEOUT)
res = local_node.query('org.apache.qpid.dispatch.router.address')
name = res.attribute_names.index('name')
found = False
for results in res.results:
if "balanced.1" == self.addr_text(results[name]):
found = True
break
if found:
self.parent.cancel_custom()
self.parent.create_sender(event)
else:
event.reactor.schedule(2, self)
class SemanticsBalanced(MessagingHandler):
def __init__(self, address1, address2, address3):
super(SemanticsBalanced, self).__init__(auto_accept=False, prefetch=0)
self.address1 = address1
self.address2 = address2
self.address3 = address3
self.dest = "balanced.1"
self.timer = None
self.conn1 = None
self.conn2 = None
self.conn3 = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
self.receiver_c = None
self.num_messages = 400
self.n_received_a = 0
self.n_received_b = 0
self.n_received_c = 0
self.error = None
self.n_sent = 0
self.rx_set = []
self.custom_timer = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.custom_timer = event.reactor.schedule(2, CustomTimeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.conn3 = event.container.connect(self.address3)
# This receiver is on the same router as the sender
self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A")
# These two receivers are connected to a different router than the sender
self.receiver_b = event.container.create_receiver(self.conn3, self.dest, name="B")
self.receiver_c = event.container.create_receiver(self.conn3, self.dest, name="C")
self.receiver_a.flow(300)
self.receiver_b.flow(300)
self.receiver_c.flow(300)
def cancel_custom(self):
self.custom_timer.cancel()
def create_sender(self, event):
self.sender = event.container.create_sender(self.conn1, self.dest)
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
self.conn1.close()
self.conn2.close()
self.conn3.close()
def check_if_done(self):
if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages and \
self.n_received_a > 0 and self.n_received_b > 0 and self.n_received_c > 0:
self.rx_set.sort()
all_messages_received = True
for i in range(self.num_messages):
if not i == self.rx_set[i]:
all_messages_received = False
if all_messages_received:
self.timer.cancel()
self.conn1.close()
self.conn2.close()
self.conn3.close()
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver_a:
self.n_received_a += 1
self.rx_set.append(event.message.body['number'])
elif event.receiver == self.receiver_b:
self.n_received_b += 1
self.rx_set.append(event.message.body['number'])
elif event.receiver == self.receiver_c:
self.n_received_c += 1
self.rx_set.append(event.message.body['number'])
self.check_if_done()
def run(self):
Container(self).run()
class PropagatedDisposition(MessagingHandler):
def __init__(self, test, address1, address2):
super(PropagatedDisposition, self).__init__(auto_accept=False)
self.address1 = address1
self.address2 = address2
self.settled = []
self.test = test
self.sender_conn = None
self.receiver_conn = None
self.passed = False
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.sender_conn = event.container.connect(self.address1)
self.receiver_conn = event.container.connect(self.address2)
addr = "unsettled/2"
self.sender = event.container.create_sender(self.sender_conn, addr)
self.receiver = event.container.create_receiver(self.receiver_conn, addr)
self.receiver.flow(2)
self.trackers = {}
for b in ['accept', 'reject']:
self.trackers[self.sender.send(Message(body=b))] = b
def timeout(self):
unique_list = sorted(list(dict.fromkeys(self.settled)))
self.error = "Timeout Expired: Expected ['accept', 'reject'] got %s" % unique_list
self.sender_conn.close()
self.receiver_conn.close()
def check(self):
unique_list = sorted(list(dict.fromkeys(self.settled)))
if unique_list == [u'accept', u'reject']:
self.passed = True
self.sender_conn.close()
self.receiver_conn.close()
self.timer.cancel()
def on_message(self, event):
if event.message.body == u'accept':
event.delivery.update(Delivery.ACCEPTED)
event.delivery.settle()
elif event.message.body == u'reject':
event.delivery.update(Delivery.REJECTED)
event.delivery.settle()
def on_accepted(self, event):
self.test.assertEqual(Delivery.ACCEPTED, event.delivery.remote_state)
self.test.assertEqual('accept', self.trackers[event.delivery])
self.settled.append('accept')
self.check()
def on_rejected(self, event):
self.test.assertEqual(Delivery.REJECTED, event.delivery.remote_state)
self.test.assertEqual('reject', self.trackers[event.delivery])
self.settled.append('reject')
self.check()
def run(self):
Container(self).run()
class ThreeAck(MessagingHandler):
def __init__(self, test, address1, address2):
super(ThreeAck, self).__init__(auto_accept=False, auto_settle=False)
self.addrs = [address1, address2]
self.settled = []
self.test = test
self.phase = 0
def on_start(self, event):
connections = [event.container.connect(a) for a in self.addrs]
addr = "three_ack/1"
self.sender = event.container.create_sender(connections[0], addr)
self.receiver = event.container.create_receiver(connections[1], addr)
self.receiver.flow(1)
self.tracker = self.sender.send(Message('hello'))
def on_message(self, event):
self.test.assertEqual(0, self.phase)
self.phase = 1
self.test.assertFalse(event.delivery.settled)
self.test.assertEqual(0, self.tracker.local_state)
self.test.assertEqual(0, self.tracker.remote_state)
event.delivery.update(Delivery.ACCEPTED)
# NOTE: we don't settle yet for 3-ack
def on_accepted(self, event):
self.test.assertTrue(event.sender)
self.test.assertEqual(1, self.phase)
self.phase = 2
self.test.assertEqual(Delivery.ACCEPTED, event.delivery.remote_state)
self.test.assertFalse(event.delivery.settled)
self.test.assertEqual(0, event.delivery.local_state)
event.delivery.settle()
self.test.assertFalse(event.delivery.settled)
event.connection.close()
def on_settled(self, event):
self.test.assertTrue(event.receiver)
self.test.assertEqual(2, self.phase)
self.phase = 3
event.connection.close()
def run(self):
Container(self).run()
self.test.assertEqual(3, self.phase)
class TwoRouterConnection(TestCase):
def __init__(self, test_method):
TestCase.__init__(self, test_method)
self.success = False
self.timer_delay = 4
self.max_attempts = 2
self.attempts = 0
self.local_node = None
@classmethod
def router(cls, name, config):
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
@classmethod
def setUpClass(cls):
super(TwoRouterConnection, cls).setUpClass()
cls.routers = []
cls.B_normal_port_1 = cls.tester.get_port()
cls.B_normal_port_2 = cls.tester.get_port()
TwoRouterConnection.router('A', [
('router', {'mode': 'interior', 'id': 'A'}),
('listener', {'host': '0.0.0.0', 'role': 'normal',
'port': cls.tester.get_port()}),
]
)
TwoRouterConnection.router('B',
[
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'host': '0.0.0.0', 'role': 'normal',
'port': cls.B_normal_port_1}),
('listener', {'host': '0.0.0.0', 'role': 'normal',
'port': cls.B_normal_port_2}),
]
)
def address(self):
return self.routers[0].addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
p = self.popen(
['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception(out if out else str(e))
return out
def can_terminate(self):
if self.attempts == self.max_attempts:
return True
if self.success:
return True
return False
def check_connections(self):
res = self.local_node.query(type='org.apache.qpid.dispatch.connection')
results = res.results
# If DISPATCH-1093 was not fixed, there would be an additional
# connection created and hence the len(results) would be 4
# Since DISPATCH-1093 is fixed, len(results would be 3 which is what
# we would expect.
if len(results) != 3:
self.schedule_num_connections_test()
else:
self.success = True
def schedule_num_connections_test(self):
if self.attempts < self.max_attempts:
if not self.success:
Timer(self.timer_delay, self.check_connections).start()
self.attempts += 1
def test_create_connectors(self):
self.local_node = Node.connect(self.routers[0].addresses[0],
timeout=TIMEOUT)
res = self.local_node.query(type='org.apache.qpid.dispatch.connection')
results = res.results
self.assertEqual(1, len(results))
long_type = 'org.apache.qpid.dispatch.connector' ''
create_command = 'CREATE --type=' + long_type + ' --name=foo' + ' host=0.0.0.0 port=' + str(TwoRouterConnection.B_normal_port_1)
self.run_qdmanage(create_command)
create_command = 'CREATE --type=' + long_type + ' --name=bar' + ' host=0.0.0.0 port=' + str(TwoRouterConnection.B_normal_port_2)
self.run_qdmanage(create_command)
self.schedule_num_connections_test()
while not self.can_terminate():
pass
self.assertTrue(self.success)
class PropagationTest(TestCase):
inter_router_port = None
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(PropagationTest, cls).setUpClass()
def router(name, extra_config):
config = [
('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
('listener', {'port': cls.tester.get_port()}),
] + extra_config
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
inter_router_port = cls.tester.get_port()
router('A', [('listener', {'role': 'inter-router', 'port': inter_router_port}), ('address', {'prefix': 'multicast', 'distribution': 'multicast'})])
router('B', [('connector', {'role': 'inter-router', 'port': inter_router_port})])
cls.routers[0].wait_router_connected('QDR.B')
cls.routers[1].wait_router_connected('QDR.A')
def test_propagation_of_locally_undefined_address(self):
test = MulticastTestClient(self.routers[0].addresses[0], self.routers[1].addresses[0])
test.run()
self.assertEqual(None, test.error)
self.assertEqual(test.received, 2)
class CreateReceiver(MessagingHandler):
def __init__(self, connection, address):
super(CreateReceiver, self).__init__()
self.connection = connection
self.address = address
def on_timer_task(self, event):
event.container.create_receiver(self.connection, self.address)
class DelayedSend(MessagingHandler):
def __init__(self, connection, address, message):
super(DelayedSend, self).__init__()
self.connection = connection
self.address = address
self.message = message
def on_timer_task(self, event):
event.container.create_sender(self.connection, self.address).send(self.message)
class MulticastTestClient(MessagingHandler):
def __init__(self, router1, router2):
super(MulticastTestClient, self).__init__()
self.routers = [router1, router2]
self.received = 0
self.error = None
def on_start(self, event):
self.connections = [event.container.connect(r) for r in self.routers]
event.container.create_receiver(self.connections[0], "multicast")
# wait for knowledge of receiver1 to propagate to second router
event.container.schedule(5, CreateReceiver(self.connections[1], "multicast"))
event.container.schedule(7, DelayedSend(self.connections[1], "multicast", Message(body="testing1,2,3")))
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
def on_message(self, event):
self.received += 1
event.connection.close()
if self.received == 2:
self.timer.cancel()
def timeout(self):
self.error = "Timeout Expired:received=%d" % self.received
for c in self.connections:
c.close()
def run(self):
Container(self).run()
class StreamingLinkScrubberTest(TestCase):
"""
Verify that unused inter-router streaming links are eventually reclaimed
"""
@classmethod
def setUpClass(cls):
super(StreamingLinkScrubberTest, cls).setUpClass()
def router(name, extra):
config = [
('router', {'id': 'Router%s' % name,
'mode': 'interior'}),
('listener', {'port': cls.tester.get_port(),
'stripAnnotations': 'no'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'})
]
if extra:
config.extend(extra)
config = Qdrouterd.Config(config)
# run routers in test mode to shorten the streaming link scrubber
# interval to 5 seconds an the maximum pool size to two links
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=["--test-hooks"]))
cls.routers = []
inter_router_port = cls.tester.get_port()
router('A',
[('listener', {'role': 'inter-router',
'port': inter_router_port})])
cls.RouterA = cls.routers[-1]
cls.RouterA.listener = cls.RouterA.addresses[0]
router('B',
[('connector', {'name': 'connectorToA', 'role':
'inter-router',
'port': inter_router_port})])
cls.RouterB = cls.routers[-1]
cls.RouterB.listener = cls.RouterB.addresses[0]
cls.RouterA.wait_router_connected('RouterB')
cls.RouterB.wait_router_connected('RouterA')
def test_01_streaming_link_scrubber(self):
"""
Ensure extra streaming links are closed by the periodic scrubber
"""
address = "closest/scrubber"
# scrubber removes at most 10 links per scan, the test pool size is 2
sender_count = 12
# fire up a receiver on RouterB to get 1 message from each sender:
env = dict(os.environ, PN_TRACE_FRM="1")
cmd = ["test-receiver",
"-a", self.RouterB.listener,
"-s", address,
"-c", str(sender_count)]
rx = self.popen(cmd, env=env)
self.RouterA.wait_address(address)
# remember the count of inter-router links on A before we start streaming
pre_count = len(get_inter_router_links(self.RouterA.listener))
# fire off the senders
cmd = ["test-sender",
"-a", self.RouterA.listener,
"-t", address,
"-c", "1",
"-sx"
]
senders = [self.popen(cmd, env=env) for x in range(sender_count)]
for tx in senders:
out_text, out_error = tx.communicate(timeout=TIMEOUT)
if tx.returncode:
raise Exception("Sender failed: %s %s" % (out_text, out_error))
# expect: more inter-router links opened. Should be 12 more, but
# depending on when the scrubber runs it may be as low as two
post_count = len(get_inter_router_links(self.RouterA.listener))
self.assertTrue(post_count > pre_count)
# expect: after 5 seconds 10 of the links should be closed and 2
# should remain (--test-hooks router option sets these parameters)
while (post_count - pre_count) > 2:
sleep(0.1)
post_count = len(get_inter_router_links(self.RouterA.listener))
out_text, out_error = rx.communicate(timeout=TIMEOUT)
if rx.returncode:
raise Exception("Receiver failed: %s %s" % (out_text, out_error))
class TwoRouterExtensionStateTest(TestCase):
"""
Verify that routers propagate extended Disposition state correctly.
See DISPATCH-1703
"""
@classmethod
def setUpClass(cls):
super(TwoRouterExtensionStateTest, cls).setUpClass()
def router(name, extra_config):
config = [
('router', {'mode': 'interior',
'id': name}),
('listener', {'port': cls.tester.get_port() }),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
] + extra_config
config = Qdrouterd.Config(config)
return cls.tester.qdrouterd(name, config, wait=False)
inter_router_port = cls.tester.get_port()
service_port = cls.tester.get_port()
cls.RouterA = router('RouterA',
[
('listener', {'role': 'inter-router',
'host': '0.0.0.0',
'port': inter_router_port,
'saslMechanisms': 'ANONYMOUS'}),
])
cls.RouterB = router('RouterB',
[
('connector', {'name': 'toRouterA',
'role': 'inter-router',
'port': inter_router_port}),
('listener', {'role': 'route-container',
'host': '0.0.0.0',
'port': service_port,
'saslMechanisms': 'ANONYMOUS'}),
('linkRoute', {'prefix': 'RoutieMcRouteFace',
'containerId': 'FakeService',
'direction': 'in'}),
('linkRoute', {'prefix': 'RoutieMcRouteFace',
'containerId': 'FakeService',
'direction': 'out'}),
])
cls.RouterA.wait_router_connected('RouterB')
cls.RouterB.wait_router_connected('RouterA')
def test_01_link_route(self):
"""
Verify non-terminal state and data propagates over a link route
"""
class MyExtendedService(FakeService):
"""
This service saves any outcome and extension data that arrives in a
transfer
"""
def __init__(self, url, container_id=None):
self.remote_state = None
self.remote_data = None
super(MyExtendedService, self).__init__(url, container_id)
def on_message(self, event):
self.remote_state = event.delivery.remote_state;
self.remote_data = event.delivery.remote.data;
super(MyExtendedService, self).on_message(event)
fs = MyExtendedService(self.RouterB.addresses[1],
container_id="FakeService")
self.RouterA.wait_address("RoutieMcRouteFace", remotes=1, count=2)
tx = MyExtendedSender(self.RouterA.addresses[0],
"RoutieMcRouteFace")
tx.wait()
fs.join()
self.assertEqual(999, fs.remote_state)
self.assertEqual([1, 2, 3], fs.remote_data)
def test_02_closest(self):
"""
Verify non-terminal state and data propagates over anycase
"""
test = ExtensionStateTester(self.RouterA.addresses[0],
self.RouterB.addresses[0],
"closest/fleabag")
test.run()
self.assertEqual(None, test.error)
def test_03_multicast(self):
"""
Verify that disposition state set by the publisher is available to all
consumers
"""
rxs = [MyExtendedReceiver(self.RouterA.addresses[0],
"multicast/thingy")
for x in range(3)]
self.RouterA.wait_address("multicast/thingy", subscribers=3)
sleep(0.5) # let subscribers grant credit
tx = MyExtendedSender(self.RouterB.addresses[0],
"multicast/thingy")
tx.wait()
# DISPATCH-1705: only one of the receivers gets the data, but all
# should get the state
ext_data = None
for rx in rxs:
rx.stop()
try:
while True:
dispo = rx.remote_states.pop()
self.assertEqual(999, dispo[0])
ext_data = dispo[1] or ext_data
except IndexError:
pass
self.assertEqual([1, 2, 3], ext_data)
class MyExtendedSender(AsyncTestSender):
"""
This sender sets a non-terminal outcome and data on the outgoing
transfer
"""
def on_sendable(self, event):
if self.sent < self.total:
dlv = event.sender.delivery(str(self.sent))
dlv.local.data = [1, 2, 3]
dlv.update(999)
event.sender.stream(self._message.encode())
event.sender.advance()
self.sent += 1
class MyExtendedReceiver(AsyncTestReceiver):
"""
This receiver stores any remote delivery state that arrives with a message
transfer
"""
def __init__(self, *args, **kwargs):
self.remote_states = []
super(MyExtendedReceiver, self).__init__(*args, **kwargs)
def on_message(self, event):
self.remote_states.append((event.delivery.remote_state,
event.delivery.remote.data))
super(MyExtendedReceiver, self).on_message(event)
class ExtensionStateTester(MessagingHandler):
"""
Verify the routers propagate non-terminal outcome and extended state
disposition information in both message transfer and disposition frames.
This tester creates a receiver and a sender link to a given address.
The sender transfers a message with a non-terminal delivery state and
associated extension data. The receiver expects to find this state in the
incoming delivery.
The receiver then responds with a non-terminal disposition that also has
extension state data. The sender expects to find this new state associated
with its delivery.
"""
def __init__(self, ingress_router, egress_router, address):
super(ExtensionStateTester, self).__init__(auto_settle=False,
auto_accept=False)
self._in_router = ingress_router
self._out_router = egress_router
self._address = address
self._sender_conn = None
self._recvr_conn = None
self._sender = None
self._receiver = None
self._sent = 0
self._received = 0
self._settled = 0
self._total = 10
self._message = Message(body="XYZ" * (1024 * 1024 * 2))
self.error = None
def on_start(self, event):
self._reactor = event.reactor
self._sender_conn = event.container.connect(self._in_router)
self._sender = event.container.create_sender(self._sender_conn,
target=self._address,
name="ExtensionSender")
self._recvr_conn = event.container.connect(self._out_router)
self._receiver = event.container.create_receiver(self._recvr_conn,
source=self._address,
name="ExtensionReceiver")
def _done(self, error=None):
self.error = error or self.error
self._sender.close()
self._sender_conn.close()
self._receiver.close()
self._recvr_conn.close()
def on_sendable(self, event):
if self._sent < self._total:
self._sent += 1
dlv = event.sender.delivery(str(self._sent))
dlv.local.data = [1, 2, 3, self._sent]
dlv.update(666) # non-terminal state
self._message.id = self._sent
event.sender.stream(self._message.encode())
event.sender.advance()
def on_message(self, event):
dlv = event.delivery
msg_id = event.message.id
if dlv.remote_state != 666:
return self._done(error="Unexpected outcome '%s', expected '666'"
% dlv.remote_state)
remote_data = dlv.remote.data
expected_data = [1, 2, 3, msg_id]
if remote_data != expected_data:
return self._done(error="Unexpected dispo data '%s', expected '%s'"
% (remote_data, expected_data))
# send back a non-terminal outcome and more data
dlv.local.data = [10, 9, 8, msg_id]
dlv.update(777)
self._received += 1
def _handle_sender_update(self, event):
dlv = event.delivery
if dlv.local_state != 666 or len(dlv.local.data) != 4:
return self._done(error="Unexpected local state at sender: %s %s" %
(dlv.local_state, dlv.local.data))
if dlv.remote_state != 777 or len(dlv.remote.data) != 4:
return self._done(error="Unexpected remote state at sender: %s %s" %
(dlv.remote_state, dlv.remote.data))
dlv.settle()
def _handle_receiver_update(self, event):
dlv = event.delivery
if dlv.settled:
if dlv.local_state != 777 or len(dlv.local.data) != 4:
return self._done(error="Unexpected local state at sender: %s %s" %
(dlv.local_state, dlv.local.data))
if dlv.remote_state != 666 or len(dlv.remote.data) != 4:
return self._done(error="Unexpected remote state at sender: %s %s" %
(dlv.remote_state, dlv.remote.data))
dlv.settle()
self._settled += 1
if self._settled == self._total:
self._done()
def on_delivery(self, event):
if event.delivery.link.is_sender:
self._handle_sender_update(event)
else:
self._handle_receiver_update(event)
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())