| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from __future__ import unicode_literals |
| from __future__ import division |
| from __future__ import absolute_import |
| from __future__ import print_function |
| |
| from time import sleep |
| import json |
| import os |
| import logging |
| from threading import Timer |
| from subprocess import PIPE, STDOUT |
| from proton import Message, Delivery, symbol, Condition |
| from system_test import Logger, TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR, TestTimeout |
| from system_test import AsyncTestReceiver |
| from system_test import AsyncTestSender |
| from system_test import get_inter_router_links |
| from system_test import unittest |
| from test_broker import FakeService |
| |
| from proton import Described, ulong |
| |
| from proton.handlers import MessagingHandler |
| from proton.reactor import Container, AtLeastOnce |
| from proton.utils import BlockingConnection |
| from qpid_dispatch.management.client import Node |
| CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451} |
| |
| |
| class TwoRouterTest(TestCase): |
| |
| inter_router_port = None |
| |
| @classmethod |
| def setUpClass(cls): |
| """Start a router and a messenger""" |
| super(TwoRouterTest, cls).setUpClass() |
| |
| def router(name, client_server, connection): |
| policy_config_path = os.path.join(DIR, 'two-router-policy') |
| |
| config = [ |
| # Use the deprecated attributes helloInterval, raInterval, raIntervalFlux, remoteLsMaxAge |
| # The routers should still start successfully after using these deprecated entities. |
| ('router', {'remoteLsMaxAge': 60, 'helloInterval': 1, 'raInterval': 30, 'raIntervalFlux': 4, |
| 'mode': 'interior', 'id': 'QDR.%s' % name, 'allowUnsettledMulticast': 'yes'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'linkCapacity': 500}), |
| |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'both'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}), |
| |
| ('address', {'prefix': 'closest', 'distribution': 'closest'}), |
| ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| |
| # for testing pattern matching |
| ('address', {'pattern': 'a.b.c.d', |
| 'distribution': 'closest'}), |
| ('address', {'pattern': '#.b.c.d', |
| 'distribution': 'multicast'}), |
| ('address', {'pattern': 'a/*/#/d', |
| 'distribution': 'closest'}), |
| ('address', {'pattern': '*/b/c/d', |
| 'distribution': 'multicast'}), |
| ('address', {'pattern': 'a.x.d', |
| 'distribution': 'closest'}), |
| ('address', {'pattern': 'a.*.d', |
| 'distribution': 'multicast'}), |
| connection |
| ] |
| |
| config = Qdrouterd.Config(config) |
| |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) |
| |
| cls.routers = [] |
| |
| inter_router_port = cls.tester.get_port() |
| |
| router('A', 'server', |
| ('listener', {'role': 'inter-router', 'port': inter_router_port})) |
| |
| router('B', 'client', |
| ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port})) |
| |
| cls.routers[0].wait_router_connected('QDR.B') |
| cls.routers[1].wait_router_connected('QDR.A') |
| |
| def address(self): |
| return self.routers[0].addresses[0] |
| |
| def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): |
| p = self.popen( |
| ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], |
| stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, |
| universal_newlines=True) |
| out = p.communicate(input)[0] |
| try: |
| p.teardown() |
| except Exception as e: |
| raise Exception(out if out else str(e)) |
| return out |
| |
| def test_01_pre_settled(self): |
| test = DeliveriesInTransit(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| |
| # deliveriesTransit must most surely be greater than num_msgs |
| pos = outs.attribute_names.index("deliveriesTransit") |
| results = outs.results[0] |
| self.assertTrue(results[pos] > 104) |
| |
| def test_02a_multicast_unsettled(self): |
| test = MulticastUnsettled(self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_02c_sender_settles_first(self): |
| test = SenderSettlesFirst(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_03_message_annotations(self): |
| test = MessageAnnotationsTest(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_03a_test_strip_message_annotations_no(self): |
| test = MessageAnnotationsStripTest(self.routers[0].addresses[1], self.routers[1].addresses[1]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_03a_test_strip_message_annotations_no_add_trace(self): |
| test = MessageAnnotationsStripAddTraceTest(self.routers[0].addresses[1], self.routers[1].addresses[1]) |
| test.run() |
| # Dump the logger output only if there is a test error, otherwise dont bother |
| if test.error: |
| test.logger.dump() |
| self.assertEqual(None, test.error) |
| |
| def test_03a_test_strip_message_annotations_both_add_ingress_trace(self): |
| test = MessageAnnotationsStripBothAddIngressTrace(self.routers[0].addresses[2], self.routers[1].addresses[2]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_03a_test_strip_message_annotations_out(self): |
| test = MessageAnnotationsStripMessageAnnotationsOut(self.routers[0].addresses[3], self.routers[1].addresses[3]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_03a_test_strip_message_annotations_in(self): |
| test = MessageAnnotationStripMessageAnnotationsIn(self.routers[0].addresses[4], self.routers[1].addresses[4]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_04_management(self): |
| test = ManagementTest(self.routers[0].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_06_semantics_closest_is_local(self): |
| test = SemanticsClosestIsLocal(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_07_semantics_closest_is_remote(self): |
| test = SemanticsClosestIsRemote(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_08_semantics_balanced(self): |
| test = SemanticsBalanced(self.routers[0].addresses[0], self.routers[0].addresses[1], |
| self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_09_to_override(self): |
| test = MessageAnnotaionsPreExistingOverride(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_10_propagated_disposition(self): |
| test = PropagatedDisposition(self, self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertTrue(test.passed) |
| |
| def test_10a_propagated_disposition_data(self): |
| test = PropagatedDispositionData(self, self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertTrue(test.passed) |
| |
| def test_11_three_ack(self): |
| test = ThreeAck(self, self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| |
| def test_12_excess_deliveries_released(self): |
| """ |
| Message-route a series of deliveries where the receiver provides credit for a subset and |
| once received, closes the link. The remaining deliveries should be released back to the sender. |
| """ |
| test = ExcessDeliveriesReleasedTest(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_15_attach_on_inter_router(self): |
| test = AttachOnInterRouterTest(self.routers[0].addresses[5]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_17_address_wildcard(self): |
| # verify proper distribution is selected by wildcard |
| addresses = [ |
| # (address, count of messages expected to be received) |
| ('a.b.c.d', 1), # closest 'a.b.c.d' |
| ('b.c.d', 2), # multi '#.b.c.d' |
| ('f.a.b.c.d', 2), # multi '#.b.c.d |
| ('a.c.d', 2), # multi 'a.*.d' |
| ('a/c/c/d', 1), # closest 'a/*/#.d |
| ('a/x/z/z/d', 1), # closest 'a/*/#.d |
| ('a/x/d', 1), # closest 'a.x.d' |
| ('a.x.e', 1), # balanced ---- |
| ('m.b.c.d', 2) # multi '*/b/c/d' |
| ] |
| |
| # two receivers per address - one for each router |
| receivers = [] |
| for a in addresses: |
| for x in range(2): |
| ar = AsyncTestReceiver(address=self.routers[x].addresses[0], |
| source=a[0]) |
| receivers.append(ar) |
| |
| # wait for the consumer info to propagate |
| for a in addresses: |
| self.routers[0].wait_address(a[0], 1, 1) |
| self.routers[1].wait_address(a[0], 1, 1) |
| |
| # send one message to each address |
| conn = BlockingConnection(self.routers[0].addresses[0]) |
| sender = conn.create_sender(address=None, options=AtLeastOnce()) |
| for a in addresses: |
| sender.send(Message(address=a[0], body={'address': a[0]})) |
| |
| # count received messages by address |
| msgs_recvd = {} |
| for M in receivers: |
| try: |
| while True: |
| i = M.queue.get(timeout=0.2).body.get('address', "ERROR") |
| if i not in msgs_recvd: |
| msgs_recvd[i] = 0 |
| msgs_recvd[i] += 1 |
| except AsyncTestReceiver.Empty: |
| pass |
| |
| # verify expected count == actual count |
| self.assertNotIn("ERROR", msgs_recvd) |
| for a in addresses: |
| self.assertIn(a[0], msgs_recvd) |
| self.assertEqual(a[1], msgs_recvd[a[0]]) |
| |
| for M in receivers: |
| M.stop() |
| conn.close() |
| |
| def test_17_large_streaming_test(self): |
| test = LargeMessageStreamTest(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_18_single_char_dest_test(self): |
| test = SingleCharacterDestinationTest(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_19_delete_inter_router_connection(self): |
| """ |
| This test tries to delete an inter-router connection but is |
| prevented from doing so. |
| """ |
| query_command = 'QUERY --type=connection' |
| outputs = json.loads(self.run_qdmanage(query_command)) |
| identity = None |
| passed = False |
| |
| for output in outputs: |
| if "inter-router" == output['role']: |
| identity = output['identity'] |
| if identity: |
| update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity |
| try: |
| json.loads(self.run_qdmanage(update_command)) |
| except Exception as e: |
| if "Forbidden" in str(e): |
| passed = True |
| |
| # The test has passed since we were forbidden from deleting |
| # inter-router connections even though we are allowed to update the adminStatus field. |
| self.assertTrue(passed) |
| |
| def test_20_delete_connection(self): |
| """ |
| This test creates a blocking connection and tries to delete that connection. |
| Since there is no policy associated with this router, the default for allowAdminStatusUpdate is true, |
| the delete operation will be permitted. |
| """ |
| |
| # Create a connection with some properties so we can easily identify the connection |
| connection = BlockingConnection(self.address(), |
| properties=CONNECTION_PROPERTIES_UNICODE_STRING) |
| query_command = 'QUERY --type=connection' |
| outputs = json.loads(self.run_qdmanage(query_command)) |
| identity = None |
| passed = False |
| |
| print() |
| |
| for output in outputs: |
| if output.get('properties'): |
| conn_properties = output['properties'] |
| # Find the connection that has our properties - CONNECTION_PROPERTIES_UNICODE_STRING |
| # Delete that connection and run another qdmanage to see |
| # if the connection is gone. |
| if conn_properties.get('int_property'): |
| identity = output.get("identity") |
| if identity: |
| update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity |
| try: |
| self.run_qdmanage(update_command) |
| query_command = 'QUERY --type=connection' |
| outputs = json.loads( |
| self.run_qdmanage(query_command)) |
| no_properties = True |
| for output in outputs: |
| if output.get('properties'): |
| no_properties = False |
| conn_properties = output['properties'] |
| if conn_properties.get('int_property'): |
| passed = False |
| break |
| else: |
| passed = True |
| if no_properties: |
| passed = True |
| except Exception as e: |
| passed = False |
| |
| # The test has passed since we were allowed to delete a connection |
| # because we have the policy permission to do so. |
| self.assertTrue(passed) |
| |
| def test_21_delete_connection_with_receiver(self): |
| test = DeleteConnectionWithReceiver(self.routers[0].addresses[0]) |
| self.assertEqual(test.error, None) |
| test.run() |
| |
| def test_30_huge_address(self): |
| # try a link with an extremely long address |
| # DISPATCH-1461 |
| addr = "A" * 2019 |
| rx = AsyncTestReceiver(self.routers[0].addresses[0], |
| source=addr) |
| tx = AsyncTestSender(self.routers[1].addresses[0], |
| target=addr, |
| count=100) |
| tx.wait() |
| |
| i = 100 |
| while i: |
| try: |
| rx.queue.get(timeout=TIMEOUT) |
| i -= 1 |
| except AsyncTestReceiver.Empty: |
| break |
| self.assertEqual(0, i) |
| rx.stop() |
| |
| |
| class DeleteConnectionWithReceiver(MessagingHandler): |
| def __init__(self, address): |
| super(DeleteConnectionWithReceiver, self).__init__() |
| self.address = address |
| self.mgmt_receiver = None |
| self.mgmt_receiver_1 = None |
| self.mgmt_receiver_2 = None |
| self.conn_to_kill = None |
| self.mgmt_conn = None |
| self.mgmt_sender = None |
| self.success = False |
| self.error = None |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| |
| # Create a receiver connection with some properties so it |
| # can be easily identified. |
| self.conn_to_kill = event.container.connect(self.address, properties=CONNECTION_PROPERTIES_UNICODE_STRING) |
| self.receiver_to_kill = event.container.create_receiver(self.conn_to_kill, "hello_world") |
| self.mgmt_conn = event.container.connect(self.address) |
| self.mgmt_sender = event.container.create_sender(self.mgmt_conn) |
| self.mgmt_receiver = event.container.create_receiver(self.mgmt_conn, None, dynamic=True) |
| self.mgmt_receiver_1 = event.container.create_receiver(self.mgmt_conn, |
| None, |
| dynamic=True) |
| self.mgmt_receiver_2 = event.container.create_receiver(self.mgmt_conn, |
| None, |
| dynamic=True) |
| |
| def timeout(self): |
| self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received) |
| self.mgmt_conn.close() |
| |
| def bail(self, error): |
| self.error = error |
| self.timer.cancel() |
| self.mgmt_conn.close() |
| self.conn_to_kill.close() |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.mgmt_receiver: |
| request = Message() |
| request.address = "amqp:/_local/$management" |
| request.properties = { |
| u'type': u'org.apache.qpid.dispatch.connection', |
| u'operation': u'QUERY'} |
| request.reply_to = self.mgmt_receiver.remote_source.address |
| self.mgmt_sender.send(request) |
| |
| def on_message(self, event): |
| if event.receiver == self.mgmt_receiver: |
| attribute_names = event.message.body['attributeNames'] |
| property_index = attribute_names .index('properties') |
| identity_index = attribute_names .index('identity') |
| |
| for result in event.message.body['results']: |
| if result[property_index]: |
| properties = result[property_index] |
| if properties.get('int_property'): |
| identity = result[identity_index] |
| if identity: |
| request = Message() |
| request.address = "amqp:/_local/$management" |
| request.properties = { |
| u'identity': identity, |
| u'type': u'org.apache.qpid.dispatch.connection', |
| u'operation': u'UPDATE' |
| } |
| request.body = { |
| u'adminStatus': u'deleted'} |
| request.reply_to = self.mgmt_receiver_1.remote_source.address |
| self.mgmt_sender.send(request) |
| elif event.receiver == self.mgmt_receiver_1: |
| if event.message.properties['statusDescription'] == 'OK' and event.message.body['adminStatus'] == 'deleted': |
| request = Message() |
| request.address = "amqp:/_local/$management" |
| request.properties = {u'type': u'org.apache.qpid.dispatch.connection', |
| u'operation': u'QUERY'} |
| request.reply_to = self.mgmt_receiver_2.remote_source.address |
| self.mgmt_sender.send(request) |
| |
| elif event.receiver == self.mgmt_receiver_2: |
| attribute_names = event.message.body['attributeNames'] |
| property_index = attribute_names .index('properties') |
| identity_index = attribute_names .index('identity') |
| |
| for result in event.message.body['results']: |
| if result[property_index]: |
| properties = result[property_index] |
| if properties and properties.get('int_property'): |
| self.bail("Connection not deleted") |
| self.bail(None) |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class SingleCharacterDestinationTest(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(SingleCharacterDestinationTest, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "x" |
| self.error = None |
| self.conn1 = None |
| self.conn2 = None |
| self.count = 1 |
| self.n_sent = 0 |
| self.timer = None |
| self.sender = None |
| self.receiver = None |
| self.n_received = 0 |
| self.body = "xyz" |
| |
| def check_if_done(self): |
| if self.n_received == self.count: |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def timeout(self): |
| self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received) |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn1 = event.container.connect(self.address1) |
| self.conn2 = event.container.connect(self.address2) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_sendable(self, event): |
| if self.n_sent < self.count: |
| msg = Message(body=self.body) |
| event.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| self.n_received += 1 |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class LargeMessageStreamTest(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(LargeMessageStreamTest, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "LargeMessageStreamTest" |
| self.error = None |
| self.conn1 = None |
| self.conn2 = None |
| self.count = 10 |
| self.n_sent = 0 |
| self.timer = None |
| self.sender = None |
| self.receiver = None |
| self.n_received = 0 |
| self.body = "" |
| for i in range(10000): |
| self.body += "0123456789101112131415" |
| |
| def check_if_done(self): |
| if self.n_received == self.count: |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def timeout(self): |
| self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received) |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn1 = event.container.connect(self.address1) |
| self.conn2 = event.container.connect(self.address2) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_sendable(self, event): |
| if self.n_sent < self.count: |
| msg = Message(body=self.body) |
| # send(msg) calls the stream function which streams data from sender to the router |
| event.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| self.n_received += 1 |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class ExcessDeliveriesReleasedTest(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0) |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "closest.EDRtest" |
| self.error = None |
| self.sender = None |
| self.receiver = None |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.n_accepted = 0 |
| self.n_released = 0 |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| |
| def timeout(self): |
| self.error = "Timeout Expired" |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn1 = event.container.connect(self.address1) |
| self.conn2 = event.container.connect(self.address2) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| self.receiver.flow(6) |
| |
| def on_sendable(self, event): |
| for i in range(10 - self.n_sent): |
| msg = Message(body=i) |
| event.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_accepted(self, event): |
| self.n_accepted += 1 |
| |
| def on_released(self, event): |
| self.n_released += 1 |
| if self.n_released == 4: |
| if self.n_accepted != 6: |
| self.error = "Expected 6 accepted, got %d" % self.n_accepted |
| if self.n_received != 6: |
| self.error = "Expected 6 received, got %d" % self.n_received |
| self.conn1.close() |
| self.conn2.close() |
| self.timer.cancel() |
| |
| def on_message(self, event): |
| self.n_received += 1 |
| if self.n_received == 6: |
| self.receiver.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class AttachOnInterRouterTest(MessagingHandler): |
| """Expect an error when attaching a link to an inter-router listener""" |
| |
| def __init__(self, address): |
| super(AttachOnInterRouterTest, self).__init__(prefetch=0) |
| self.address = address |
| self.dest = "AOIRtest" |
| self.error = None |
| self.sender = None |
| self.timer = None |
| self.conn = None |
| |
| def timeout(self): |
| self.error = "Timeout Expired" |
| self.conn.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn = event.container.connect(self.address) |
| self.sender = event.container.create_sender(self.conn, self.dest) |
| |
| def on_link_remote_close(self, event): |
| self.conn.close() |
| self.timer.cancel() |
| |
| def run(self): |
| logging.disable(logging.ERROR) # Hide expected log errors |
| try: |
| Container(self).run() |
| finally: |
| logging.disable(logging.NOTSET) # Restore to normal |
| |
| |
| class DeliveriesInTransit(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(DeliveriesInTransit, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "pre_settled.1" |
| self.error = "All messages not received" |
| self.n_sent = 0 |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.num_msgs = 104 |
| self.sent_count = 0 |
| self.received_count = 0 |
| self.receiver = None |
| |
| def timeout(self): |
| self.error = "Timeout Expired: n_sent=%d n_received_count=%d" % (self.n_sent, self.received_count) |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_sendable(self, event): |
| if self.n_sent <= self.num_msgs - 1: |
| msg = Message(body="Hello World") |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def check_if_done(self): |
| if self.n_sent == self.received_count: |
| self.error = None |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_message(self, event): |
| self.received_count += 1 |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MessageAnnotationsTest(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(MessageAnnotationsTest, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "ma/1" |
| self.error = "Message annotations not found" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver = None |
| self.sent_count = 0 |
| self.msg_not_sent = True |
| |
| def timeout(self): |
| self.error = "Timeout Expired: " + self.error |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| |
| def on_sendable(self, event): |
| if self.msg_not_sent: |
| msg = Message(body={'number': 0}) |
| event.sender.send(msg) |
| self.msg_not_sent = False |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| if 0 == event.message.body['number']: |
| ma = event.message.annotations |
| if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B']: |
| self.error = None |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MessageAnnotationsStripTest(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(MessageAnnotationsStripTest, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "message_annotations_strip_no/1" |
| self.error = "Message annotations not found" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver = None |
| self.sent_count = 0 |
| self.msg_not_sent = True |
| |
| def timeout(self): |
| self.error = "Timeout Expired: " + self.error |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| if self.msg_not_sent: |
| msg = Message(body={'number': 0}) |
| ingress_message_annotations = {'work': 'hard', 'stay': 'humble'} |
| msg.annotations = ingress_message_annotations |
| event.sender.send(msg) |
| self.msg_not_sent = False |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| if 0 == event.message.body['number']: |
| ma = event.message.annotations |
| if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B'] \ |
| and ma['work'] == 'hard' and ma['stay'] == 'humble': |
| self.error = None |
| self.accept(event.delivery) |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class ManagementTest(MessagingHandler): |
| def __init__(self, address): |
| super(ManagementTest, self).__init__() |
| self.address = address |
| self.timer = None |
| self.conn = None |
| self.sender = None |
| self.receiver = None |
| self.sent_count = 0 |
| self.msg_not_sent = True |
| self.error = None |
| self.response1 = False |
| self.response2 = False |
| |
| def timeout(self): |
| if not self.response1: |
| self.error = "Incorrect response received for message with correlation id C1" |
| if not self.response1: |
| self.error = self.error + "Incorrect response received for message with correlation id C2" |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn = event.container.connect(self.address) |
| self.sender = event.container.create_sender(self.conn) |
| self.receiver = event.container.create_receiver(self.conn, None, dynamic=True) |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| request = Message() |
| request.correlation_id = "C1" |
| request.address = "amqp:/_local/$management" |
| request.properties = {u'type': u'org.amqp.management', u'name': u'self', u'operation': u'GET-MGMT-NODES'} |
| request.reply_to = self.receiver.remote_source.address |
| self.sender.send(request) |
| |
| request = Message() |
| request.address = "amqp:/_topo/0/QDR.B/$management" |
| request.correlation_id = "C2" |
| request.reply_to = self.receiver.remote_source.address |
| request.properties = {u'type': u'org.amqp.management', u'name': u'self', u'operation': u'GET-MGMT-NODES'} |
| self.sender.send(request) |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| if event.message.correlation_id == "C1": |
| if event.message.properties['statusCode'] == 200 and \ |
| event.message.properties['statusDescription'] is not None \ |
| and 'amqp:/_topo/0/QDR.B/$management' in event.message.body: |
| self.response1 = True |
| elif event.message.correlation_id == "C2": |
| if event.message.properties['statusCode'] == 200 and \ |
| event.message.properties['statusDescription'] is not None \ |
| and 'amqp:/_topo/0/QDR.A/$management' in event.message.body: |
| self.response2 = True |
| |
| if self.response1 and self.response2: |
| self.error = None |
| |
| if self.error is None: |
| self.timer.cancel() |
| self.conn.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MessageAnnotationStripMessageAnnotationsIn(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(MessageAnnotationStripMessageAnnotationsIn, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "strip_message_annotations_in/1" |
| self.error = "Message annotations not found" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver = None |
| self.sent_count = 0 |
| self.msg_not_sent = True |
| |
| def timeout(self): |
| self.error = "Timeout Expired: " + self.error |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| if self.msg_not_sent: |
| msg = Message(body={'number': 0}) |
| # |
| # Pre-existing ingress and trace |
| # |
| ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['X/QDR']} |
| msg.annotations = ingress_message_annotations |
| event.sender.send(msg) |
| self.msg_not_sent = False |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| if 0 == event.message.body['number']: |
| if event.message.annotations['x-opt-qd.ingress'] == '0/QDR.A' \ |
| and event.message.annotations['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B']: |
| self.error = None |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MessageAnnotaionsPreExistingOverride(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(MessageAnnotaionsPreExistingOverride, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "toov/1" |
| self.error = "Pre-existing x-opt-qd.to has been stripped" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver = None |
| self.msg_not_sent = True |
| |
| def timeout(self): |
| self.error = "Timeout Expired: " + self.error |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| if self.msg_not_sent: |
| msg = Message(body={'number': 0}) |
| msg.annotations = {'x-opt-qd.to': 'toov/1'} |
| event.sender.send(msg) |
| self.msg_not_sent = False |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| if 0 == event.message.body['number']: |
| ma = event.message.annotations |
| if ma['x-opt-qd.to'] == 'toov/1': |
| self.error = None |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MessageAnnotationsStripMessageAnnotationsOut(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(MessageAnnotationsStripMessageAnnotationsOut, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "strip_message_annotations_out/1" |
| self.error = "Message annotations not found" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver = None |
| self.sent_count = 0 |
| self.msg_not_sent = True |
| |
| def timeout(self): |
| self.error = "Timeout Expired: " + self.error |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| if self.msg_not_sent: |
| msg = Message(body={'number': 0}) |
| event.sender.send(msg) |
| self.msg_not_sent = False |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| if 0 == event.message.body['number']: |
| if event.message.annotations is None: |
| self.error = None |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MessageAnnotationsStripBothAddIngressTrace(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(MessageAnnotationsStripBothAddIngressTrace, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "strip_message_annotations_both_add_ingress_trace/1" |
| self.error = "Message annotations not found" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver = None |
| self.sent_count = 0 |
| self.msg_not_sent = True |
| |
| def timeout(self): |
| self.error = "Timeout Expired: " + self.error |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| if self.msg_not_sent: |
| msg = Message(body={'number': 0}) |
| ingress_message_annotations = {'work': 'hard', |
| 'x-opt-qd': 'humble', |
| 'x-opt-qd.ingress': 'ingress-router', |
| 'x-opt-qd.trace': ['0/QDR.A']} |
| msg.annotations = ingress_message_annotations |
| event.sender.send(msg) |
| self.msg_not_sent = False |
| |
| def on_message(self, event): |
| if self.receiver == event.receiver: |
| if 0 == event.message.body['number']: |
| if event.message.annotations == {'work': 'hard', 'x-opt-qd': 'humble'}: |
| self.error = None |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MessageAnnotationsStripAddTraceTest(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(MessageAnnotationsStripAddTraceTest, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "message_annotations_strip_no/1" |
| self.error = "Message annotations not found" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver = None |
| self.sent_count = 0 |
| self.msg_not_sent = True |
| self.logger = Logger(title="MessageAnnotationsStripAddTraceTest") |
| |
| def timeout(self): |
| self.error = "Timeout Expired: " + self.error |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| self.logger.log("on_start(): Receiver link created on self.conn2") |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| if self.msg_not_sent: |
| msg = Message(body={'number': 0}) |
| ingress_message_annotations = {'x-opt-qd.trace': ['0/QDR.1']} |
| msg.annotations = ingress_message_annotations |
| event.sender.send(msg) |
| self.msg_not_sent = False |
| self.logger.log("on_sendable(): Message sent") |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| self.logger.log("on_link_opened(): Sender link created on self.conn1") |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.logger.log("on_message(): Message received by receiver") |
| if 0 == event.message.body['number']: |
| self.logger.log("on_message(): Message received by receiver body matches expected body") |
| ma = event.message.annotations |
| if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.1', '0/QDR.A', '0/QDR.B']: |
| self.logger.log("on_message(): Message annotations in message match expected message annotations. Success...") |
| self.error = None |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| else: |
| self.logger.log("on_message(): Message received by receiver, message annotations are not as expected") |
| self.logger.log(ma) |
| else: |
| self.logger.log("on_message(): Message received by receiver but body is not expected") |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class SenderSettlesFirst(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(SenderSettlesFirst, self).__init__(auto_accept=False) |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "closest.senderfirst.1" |
| self.error = "Message body received differs from the one sent" |
| self.n_sent = 0 |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.sent_count = 0 |
| self.received_count = 0 |
| self.receiver = None |
| self.msg_not_sent = True |
| |
| def timeout(self): |
| self.error = "Timeout Expired: " + self.error |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn2 = event.container.connect(self.address2) |
| self.receiver = event.container.create_receiver(self.conn2, self.dest) |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.conn1 = event.container.connect(self.address1) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| |
| def on_sendable(self, event): |
| if event.sender == self.sender: |
| if self.msg_not_sent: |
| msg = Message(body={'number': 0}) |
| dlv = event.sender.send(msg) |
| dlv.settle() |
| self.msg_not_sent = False |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| if 0 == event.message.body['number']: |
| self.error = None |
| self.accept(event.delivery) |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MulticastUnsettled(MessagingHandler): |
| def __init__(self, address): |
| super(MulticastUnsettled, self).__init__() |
| self.address = address |
| self.dest = "multicast.2" |
| self.error = None |
| self.n_sent = 0 |
| self.count = 3 |
| self.n_received_a = 0 |
| self.n_received_b = 0 |
| self.n_received_c = 0 |
| self.timer = None |
| self.conn = None |
| self.sender = None |
| self.receiver_a = None |
| self.receiver_b = None |
| self.receiver_c = None |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn = event.container.connect(self.address) |
| self.sender = event.container.create_sender(self.conn, self.dest) |
| self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A") |
| self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B") |
| self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C") |
| |
| def timeout(self): |
| self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \ |
| (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c) |
| self.conn.close() |
| |
| def check_if_done(self): |
| if self.n_received_a + self.n_received_b + self.n_received_c == self.count: |
| self.timer.cancel() |
| self.conn.close() |
| |
| def on_sendable(self, event): |
| if self.n_sent == 0: |
| msg = Message(body="MulticastUnsettled-Test") |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver_a: |
| self.n_received_a += 1 |
| if event.receiver == self.receiver_b: |
| self.n_received_b += 1 |
| if event.receiver == self.receiver_c: |
| self.n_received_c += 1 |
| |
| def on_accepted(self, event): |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class SemanticsClosestIsLocal(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(SemanticsClosestIsLocal, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "closest.1" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver_a = None |
| self.receiver_b = None |
| self.receiver_c = None |
| self.num_messages = 100 |
| self.n_received_a = 0 |
| self.n_received_b = 0 |
| self.n_received_c = 0 |
| self.error = None |
| self.n_sent = 0 |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn1 = event.container.connect(self.address1) |
| self.conn2 = event.container.connect(self.address2) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| # Receiver on same router as the sender must receive all the messages. The other two |
| # receivers are on the other router |
| self.receiver_a = event.container.create_receiver(self.conn1, self.dest, name="A") |
| self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B") |
| self.receiver_c = event.container.create_receiver(self.conn2, self.dest, name="C") |
| |
| def timeout(self): |
| self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \ |
| (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c) |
| self.conn1.close() |
| self.conn2.close() |
| |
| def check_if_done(self): |
| if self.n_received_a == 100 and self.n_received_b + self.n_received_c == 0: |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_sendable(self, event): |
| if self.n_sent < self.num_messages: |
| msg = Message(body="SemanticsClosestIsLocal-Test") |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver_a: |
| self.n_received_a += 1 |
| if event.receiver == self.receiver_b: |
| self.n_received_b += 1 |
| if event.receiver == self.receiver_c: |
| self.n_received_c += 1 |
| |
| def on_accepted(self, event): |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class SemanticsClosestIsRemote(MessagingHandler): |
| def __init__(self, address1, address2): |
| super(SemanticsClosestIsRemote, self).__init__() |
| self.address1 = address1 |
| self.address2 = address2 |
| self.dest = "closest.1" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.sender = None |
| self.receiver_a = None |
| self.receiver_b = None |
| self.receiver_c = None |
| self.num_messages = 100 |
| self.n_received_a = 0 |
| self.n_received_b = 0 |
| self.error = None |
| self.n_sent = 0 |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn1 = event.container.connect(self.address1) |
| self.conn2 = event.container.connect(self.address2) |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| # Receiver on same router as the sender must receive all the messages. The other two |
| # receivers are on the other router |
| self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A") |
| self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B") |
| |
| def timeout(self): |
| self.error = "Timeout Expired: sent=%d rcvd=%d/%d" % \ |
| (self.n_sent, self.n_received_a, self.n_received_b) |
| self.conn1.close() |
| self.conn2.close() |
| |
| def check_if_done(self): |
| if self.n_received_a + self.n_received_b == 100 and self.n_received_a > 0 and self.n_received_b > 0: |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| |
| def on_sendable(self, event): |
| if self.n_sent < self.num_messages: |
| msg = Message(body="SemanticsClosestIsRemote-Test") |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver_a: |
| self.n_received_a += 1 |
| if event.receiver == self.receiver_b: |
| self.n_received_b += 1 |
| |
| def on_accepted(self, event): |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class CustomTimeout(object): |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def addr_text(self, addr): |
| if not addr: |
| return "" |
| if addr[0] == 'M': |
| return addr[2:] |
| else: |
| return addr[1:] |
| |
| def on_timer_task(self, event): |
| local_node = Node.connect(self.parent.address1, timeout=TIMEOUT) |
| |
| res = local_node.query('org.apache.qpid.dispatch.router.address') |
| name = res.attribute_names.index('name') |
| found = False |
| for results in res.results: |
| if "balanced.1" == self.addr_text(results[name]): |
| found = True |
| break |
| |
| if found: |
| self.parent.cancel_custom() |
| self.parent.create_sender(event) |
| |
| else: |
| event.reactor.schedule(2, self) |
| |
| |
| class SemanticsBalanced(MessagingHandler): |
| def __init__(self, address1, address2, address3): |
| super(SemanticsBalanced, self).__init__(auto_accept=False, prefetch=0) |
| self.address1 = address1 |
| self.address2 = address2 |
| self.address3 = address3 |
| self.dest = "balanced.1" |
| self.timer = None |
| self.conn1 = None |
| self.conn2 = None |
| self.conn3 = None |
| self.sender = None |
| self.receiver_a = None |
| self.receiver_b = None |
| self.receiver_c = None |
| self.num_messages = 400 |
| self.n_received_a = 0 |
| self.n_received_b = 0 |
| self.n_received_c = 0 |
| self.error = None |
| self.n_sent = 0 |
| self.rx_set = [] |
| self.custom_timer = None |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.custom_timer = event.reactor.schedule(2, CustomTimeout(self)) |
| self.conn1 = event.container.connect(self.address1) |
| self.conn2 = event.container.connect(self.address2) |
| self.conn3 = event.container.connect(self.address3) |
| |
| # This receiver is on the same router as the sender |
| self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A") |
| |
| # These two receivers are connected to a different router than the sender |
| self.receiver_b = event.container.create_receiver(self.conn3, self.dest, name="B") |
| self.receiver_c = event.container.create_receiver(self.conn3, self.dest, name="C") |
| |
| self.receiver_a.flow(300) |
| self.receiver_b.flow(300) |
| self.receiver_c.flow(300) |
| |
| def cancel_custom(self): |
| self.custom_timer.cancel() |
| |
| def create_sender(self, event): |
| self.sender = event.container.create_sender(self.conn1, self.dest) |
| |
| def timeout(self): |
| self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \ |
| (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c) |
| self.conn1.close() |
| self.conn2.close() |
| self.conn3.close() |
| |
| def check_if_done(self): |
| if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages and \ |
| self.n_received_a > 0 and self.n_received_b > 0 and self.n_received_c > 0: |
| self.rx_set.sort() |
| all_messages_received = True |
| for i in range(self.num_messages): |
| if not i == self.rx_set[i]: |
| all_messages_received = False |
| |
| if all_messages_received: |
| self.timer.cancel() |
| self.conn1.close() |
| self.conn2.close() |
| self.conn3.close() |
| |
| def on_sendable(self, event): |
| if self.n_sent < self.num_messages: |
| msg = Message(body={'number': self.n_sent}) |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver_a: |
| self.n_received_a += 1 |
| self.rx_set.append(event.message.body['number']) |
| elif event.receiver == self.receiver_b: |
| self.n_received_b += 1 |
| self.rx_set.append(event.message.body['number']) |
| elif event.receiver == self.receiver_c: |
| self.n_received_c += 1 |
| self.rx_set.append(event.message.body['number']) |
| |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class PropagatedDisposition(MessagingHandler): |
| """ |
| Verify outcomes are properly sent end-to-end |
| """ |
| |
| def __init__(self, test, address1, address2): |
| super(PropagatedDisposition, self).__init__(auto_accept=False) |
| self.address1 = address1 |
| self.address2 = address2 |
| self.settled = [] |
| self.test = test |
| self.sender = None |
| self.receiver = None |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.passed = False |
| self.dispos = ['accept', 'modified', 'reject'] |
| self.dispos_index = 0 |
| self.trackers = {} |
| self.addr = "unsettled/2" |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.sender_conn = event.container.connect(self.address1) |
| self.receiver_conn = event.container.connect(self.address2) |
| |
| self.receiver = event.container.create_receiver(self.receiver_conn, |
| self.addr) |
| self.sender = event.container.create_sender(self.sender_conn, |
| self.addr) |
| |
| def on_sendable(self, event): |
| # This function is called when the sender has credit to send |
| if self.dispos_index < 3: |
| self.trackers[self.sender.send(Message(body=self.dispos[self.dispos_index]))] = self.dispos[self.dispos_index] |
| self.dispos_index += 1 |
| |
| def timeout(self): |
| unique_list = sorted(list(dict.fromkeys(self.settled))) |
| self.error = "Timeout Expired: Expected ['accept', 'modified', 'reject'] got %s" % unique_list |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def check(self): |
| unique_list = sorted(list(dict.fromkeys(self.settled))) |
| if unique_list == [u'accept', u'modified', u'reject']: |
| self.passed = True |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.timer.cancel() |
| |
| def on_message(self, event): |
| if event.message.body == u'accept': |
| event.delivery.update(Delivery.ACCEPTED) |
| event.delivery.settle() |
| elif event.message.body == u'reject': |
| self.set_rejected_data(event.delivery.local) |
| event.delivery.update(Delivery.REJECTED) |
| event.delivery.settle() |
| elif event.message.body == u'modified': |
| self.set_modified_data(event.delivery.local) |
| event.delivery.update(Delivery.MODIFIED) |
| event.delivery.settle() |
| |
| def on_accepted(self, event): |
| self.test.assertEqual(Delivery.ACCEPTED, event.delivery.remote_state) |
| self.test.assertEqual('accept', self.trackers[event.delivery]) |
| self.settled.append('accept') |
| self.check() |
| |
| def on_rejected(self, event): |
| self.test.assertEqual(Delivery.REJECTED, event.delivery.remote_state) |
| self.test.assertEqual('reject', self.trackers[event.delivery]) |
| self.check_rejected_data(event.delivery.remote) |
| self.settled.append('reject') |
| self.check() |
| |
| def on_released(self, event): |
| # yes, for some reason Proton triggers on_released when MODIFIED is set |
| self.test.assertEqual(Delivery.MODIFIED, event.delivery.remote_state) |
| self.test.assertEqual('modified', self.trackers[event.delivery]) |
| self.check_modified_data(event.delivery.remote) |
| self.settled.append('modified') |
| self.check() |
| |
| def set_rejected_data(self, local_state): |
| # use defaults |
| pass |
| |
| def check_rejected_data(self, remote_state): |
| self.test.assertTrue(remote_state.condition is None) |
| |
| def set_modified_data(self, local_state): |
| # use defaults |
| pass |
| |
| def check_modified_data(self, remote_state): |
| self.test.assertTrue(remote_state.failed) |
| self.test.assertFalse(remote_state.undeliverable) |
| self.test.assertTrue(remote_state.annotations is None) |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class PropagatedDispositionData(PropagatedDisposition): |
| """ |
| Verify that data associated with a terminal outcome is correctly passed end |
| to end |
| """ |
| |
| def set_rejected_data(self, local_state): |
| local_state.condition = Condition("name", |
| str("description"), |
| {symbol("info"): True}) |
| |
| def check_rejected_data(self, remote_state): |
| cond = remote_state.condition |
| self.test.assertEqual("name", cond.name) |
| self.test.assertEqual("description", cond.description) |
| self.test.assertTrue(cond.info is not None) |
| self.test.assertTrue(symbol("info") in cond.info) |
| self.test.assertEqual(True, cond.info[symbol("info")]) |
| |
| def set_modified_data(self, local_state): |
| local_state.failed = True |
| local_state.undeliverable = True |
| local_state.annotations = {symbol('modified'): True} |
| |
| def check_modified_data(self, remote_state): |
| self.test.assertTrue(remote_state.failed) |
| self.test.assertTrue(remote_state.undeliverable) |
| self.test.assertTrue(remote_state.annotations is not None) |
| self.test.assertTrue(symbol('modified') in remote_state.annotations) |
| self.test.assertEqual(True, remote_state.annotations[symbol('modified')]) |
| |
| |
| class ThreeAck(MessagingHandler): |
| def __init__(self, test, address1, address2): |
| super(ThreeAck, self).__init__(auto_accept=False, auto_settle=False) |
| self.addrs = [address1, address2] |
| self.settled = [] |
| self.test = test |
| self.phase = 0 |
| |
| def on_start(self, event): |
| connections = [event.container.connect(a) for a in self.addrs] |
| addr = "three_ack/1" |
| self.sender = event.container.create_sender(connections[0], addr) |
| self.receiver = event.container.create_receiver(connections[1], addr) |
| self.receiver.flow(1) |
| self.tracker = self.sender.send(Message('hello')) |
| |
| def on_message(self, event): |
| self.test.assertEqual(0, self.phase) |
| self.phase = 1 |
| self.test.assertFalse(event.delivery.settled) |
| self.test.assertEqual(0, self.tracker.local_state) |
| self.test.assertEqual(0, self.tracker.remote_state) |
| event.delivery.update(Delivery.ACCEPTED) |
| # NOTE: we don't settle yet for 3-ack |
| |
| def on_accepted(self, event): |
| self.test.assertTrue(event.sender) |
| self.test.assertEqual(1, self.phase) |
| self.phase = 2 |
| self.test.assertEqual(Delivery.ACCEPTED, event.delivery.remote_state) |
| self.test.assertFalse(event.delivery.settled) |
| self.test.assertEqual(0, event.delivery.local_state) |
| event.delivery.settle() |
| self.test.assertFalse(event.delivery.settled) |
| event.connection.close() |
| |
| def on_settled(self, event): |
| self.test.assertTrue(event.receiver) |
| self.test.assertEqual(2, self.phase) |
| self.phase = 3 |
| event.connection.close() |
| |
| def run(self): |
| Container(self).run() |
| self.test.assertEqual(3, self.phase) |
| |
| |
| class TwoRouterConnection(TestCase): |
| def __init__(self, test_method): |
| TestCase.__init__(self, test_method) |
| self.success = False |
| self.timer_delay = 4 |
| self.max_attempts = 2 |
| self.attempts = 0 |
| self.local_node = None |
| |
| @classmethod |
| def router(cls, name, config): |
| config = Qdrouterd.Config(config) |
| |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TwoRouterConnection, cls).setUpClass() |
| |
| cls.routers = [] |
| |
| cls.B_normal_port_1 = cls.tester.get_port() |
| cls.B_normal_port_2 = cls.tester.get_port() |
| |
| TwoRouterConnection.router('A', [ |
| ('router', {'mode': 'interior', 'id': 'A'}), |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', |
| 'port': cls.tester.get_port()}), |
| ] |
| ) |
| |
| TwoRouterConnection.router('B', |
| [ |
| ('router', {'mode': 'interior', 'id': 'B'}), |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', |
| 'port': cls.B_normal_port_1}), |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', |
| 'port': cls.B_normal_port_2}), |
| |
| ] |
| ) |
| |
| def address(self): |
| return self.routers[0].addresses[0] |
| |
| def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): |
| p = self.popen( |
| ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], |
| stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, |
| universal_newlines=True) |
| out = p.communicate(input)[0] |
| try: |
| p.teardown() |
| except Exception as e: |
| raise Exception(out if out else str(e)) |
| return out |
| |
| def can_terminate(self): |
| if self.attempts == self.max_attempts: |
| return True |
| |
| if self.success: |
| return True |
| |
| return False |
| |
| def check_connections(self): |
| res = self.local_node.query(type='org.apache.qpid.dispatch.connection') |
| results = res.results |
| |
| # If DISPATCH-1093 was not fixed, there would be an additional |
| # connection created and hence the len(results) would be 4 |
| |
| # Since DISPATCH-1093 is fixed, len(results would be 3 which is what |
| # we would expect. |
| |
| if len(results) != 3: |
| self.schedule_num_connections_test() |
| else: |
| self.success = True |
| |
| def schedule_num_connections_test(self): |
| if self.attempts < self.max_attempts: |
| if not self.success: |
| Timer(self.timer_delay, self.check_connections).start() |
| self.attempts += 1 |
| |
| def test_create_connectors(self): |
| self.local_node = Node.connect(self.routers[0].addresses[0], |
| timeout=TIMEOUT) |
| |
| res = self.local_node.query(type='org.apache.qpid.dispatch.connection') |
| results = res.results |
| |
| self.assertEqual(1, len(results)) |
| |
| long_type = 'org.apache.qpid.dispatch.connector' '' |
| |
| create_command = 'CREATE --type=' + long_type + ' --name=foo' + ' host=0.0.0.0 port=' + str(TwoRouterConnection.B_normal_port_1) |
| |
| self.run_qdmanage(create_command) |
| |
| create_command = 'CREATE --type=' + long_type + ' --name=bar' + ' host=0.0.0.0 port=' + str(TwoRouterConnection.B_normal_port_2) |
| |
| self.run_qdmanage(create_command) |
| |
| self.schedule_num_connections_test() |
| |
| while not self.can_terminate(): |
| pass |
| |
| self.assertTrue(self.success) |
| |
| |
| class PropagationTest(TestCase): |
| |
| inter_router_port = None |
| |
| @classmethod |
| def setUpClass(cls): |
| """Start a router and a messenger""" |
| super(PropagationTest, cls).setUpClass() |
| |
| def router(name, extra_config): |
| |
| config = [ |
| ('router', {'mode': 'interior', 'id': 'QDR.%s' % name}), |
| |
| ('listener', {'port': cls.tester.get_port()}), |
| |
| ] + extra_config |
| |
| config = Qdrouterd.Config(config) |
| |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) |
| |
| cls.routers = [] |
| |
| inter_router_port = cls.tester.get_port() |
| router('A', [('listener', {'role': 'inter-router', 'port': inter_router_port}), ('address', {'prefix': 'multicast', 'distribution': 'multicast'})]) |
| router('B', [('connector', {'role': 'inter-router', 'port': inter_router_port})]) |
| |
| cls.routers[0].wait_router_connected('QDR.B') |
| cls.routers[1].wait_router_connected('QDR.A') |
| |
| def test_propagation_of_locally_undefined_address(self): |
| test = MulticastTestClient(self.routers[0].addresses[0], self.routers[1].addresses[0]) |
| test.run() |
| self.assertEqual(None, test.error) |
| self.assertEqual(test.received, 2) |
| |
| |
| class CreateReceiver(MessagingHandler): |
| def __init__(self, connection, address): |
| super(CreateReceiver, self).__init__() |
| self.connection = connection |
| self.address = address |
| |
| def on_timer_task(self, event): |
| event.container.create_receiver(self.connection, self.address) |
| |
| |
| class DelayedSend(MessagingHandler): |
| def __init__(self, connection, address, message): |
| super(DelayedSend, self).__init__() |
| self.connection = connection |
| self.address = address |
| self.message = message |
| |
| def on_timer_task(self, event): |
| event.container.create_sender(self.connection, self.address).send(self.message) |
| |
| |
| class MulticastTestClient(MessagingHandler): |
| def __init__(self, router1, router2): |
| super(MulticastTestClient, self).__init__() |
| self.routers = [router1, router2] |
| self.received = 0 |
| self.error = None |
| |
| def on_start(self, event): |
| self.connections = [event.container.connect(r) for r in self.routers] |
| event.container.create_receiver(self.connections[0], "multicast") |
| # wait for knowledge of receiver1 to propagate to second router |
| event.container.schedule(5, CreateReceiver(self.connections[1], "multicast")) |
| event.container.schedule(7, DelayedSend(self.connections[1], "multicast", Message(body="testing1,2,3"))) |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| |
| def on_message(self, event): |
| self.received += 1 |
| event.connection.close() |
| if self.received == 2: |
| self.timer.cancel() |
| |
| def timeout(self): |
| self.error = "Timeout Expired:received=%d" % self.received |
| for c in self.connections: |
| c.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class StreamingLinkScrubberTest(TestCase): |
| """ |
| Verify that unused inter-router streaming links are eventually reclaimed |
| """ |
| |
| @classmethod |
| def setUpClass(cls): |
| super(StreamingLinkScrubberTest, cls).setUpClass() |
| |
| def router(name, extra): |
| config = [ |
| ('router', {'id': 'Router%s' % name, |
| 'mode': 'interior'}), |
| ('listener', {'port': cls.tester.get_port(), |
| 'stripAnnotations': 'no'}), |
| ('address', {'prefix': 'closest', 'distribution': 'closest'}), |
| ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}) |
| |
| ] |
| |
| if extra: |
| config.extend(extra) |
| |
| config = Qdrouterd.Config(config) |
| |
| # run routers in test mode to shorten the streaming link scrubber |
| # interval to 5 seconds an the maximum pool size to two links |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=["--test-hooks"])) |
| |
| cls.routers = [] |
| |
| inter_router_port = cls.tester.get_port() |
| |
| router('A', |
| [('listener', {'role': 'inter-router', |
| 'port': inter_router_port})]) |
| cls.RouterA = cls.routers[-1] |
| cls.RouterA.listener = cls.RouterA.addresses[0] |
| |
| router('B', |
| [('connector', {'name': 'connectorToA', 'role': |
| 'inter-router', |
| 'port': inter_router_port})]) |
| cls.RouterB = cls.routers[-1] |
| cls.RouterB.listener = cls.RouterB.addresses[0] |
| |
| cls.RouterA.wait_router_connected('RouterB') |
| cls.RouterB.wait_router_connected('RouterA') |
| |
| def test_01_streaming_link_scrubber(self): |
| """ |
| Ensure extra streaming links are closed by the periodic scrubber |
| """ |
| address = "closest/scrubber" |
| |
| # scrubber removes at most 10 links per scan, the test pool size is 2 |
| sender_count = 12 |
| |
| # fire up a receiver on RouterB to get 1 message from each sender: |
| env = dict(os.environ, PN_TRACE_FRM="1") |
| cmd = ["test-receiver", |
| "-a", self.RouterB.listener, |
| "-s", address, |
| "-c", str(sender_count)] |
| rx = self.popen(cmd, env=env) |
| |
| self.RouterA.wait_address(address) |
| |
| # remember the count of inter-router links on A before we start streaming |
| pre_count = len(get_inter_router_links(self.RouterA.listener)) |
| |
| # fire off the senders |
| cmd = ["test-sender", |
| "-a", self.RouterA.listener, |
| "-t", address, |
| "-c", "1", |
| "-sx" |
| ] |
| senders = [self.popen(cmd, env=env) for x in range(sender_count)] |
| |
| for tx in senders: |
| out_text, out_error = tx.communicate(timeout=TIMEOUT) |
| if tx.returncode: |
| raise Exception("Sender failed: %s %s" % (out_text, out_error)) |
| |
| # expect: more inter-router links opened. Should be 12 more, but |
| # depending on when the scrubber runs it may be as low as two |
| post_count = len(get_inter_router_links(self.RouterA.listener)) |
| self.assertTrue(post_count > pre_count) |
| |
| # expect: after 5 seconds 10 of the links should be closed and 2 |
| # should remain (--test-hooks router option sets these parameters) |
| while (post_count - pre_count) > 2: |
| sleep(0.1) |
| post_count = len(get_inter_router_links(self.RouterA.listener)) |
| |
| out_text, out_error = rx.communicate(timeout=TIMEOUT) |
| if rx.returncode: |
| raise Exception("Receiver failed: %s %s" % (out_text, out_error)) |
| |
| |
| class TwoRouterExtensionStateTest(TestCase): |
| """ |
| Verify that routers propagate extended Disposition state correctly. |
| See DISPATCH-1703 |
| """ |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TwoRouterExtensionStateTest, cls).setUpClass() |
| |
| def router(name, extra_config): |
| |
| config = [ |
| ('router', {'mode': 'interior', |
| 'id': name}), |
| |
| ('listener', {'port': cls.tester.get_port()}), |
| |
| ('address', {'prefix': 'closest', 'distribution': 'closest'}), |
| ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| ] + extra_config |
| |
| config = Qdrouterd.Config(config) |
| return cls.tester.qdrouterd(name, config, wait=False) |
| |
| inter_router_port = cls.tester.get_port() |
| service_port = cls.tester.get_port() |
| |
| cls.RouterA = router('RouterA', |
| [ |
| ('listener', {'role': 'inter-router', |
| 'host': '0.0.0.0', |
| 'port': inter_router_port, |
| 'saslMechanisms': 'ANONYMOUS'}), |
| ]) |
| |
| cls.RouterB = router('RouterB', |
| [ |
| ('connector', {'name': 'toRouterA', |
| 'role': 'inter-router', |
| 'port': inter_router_port}), |
| ('listener', {'role': 'route-container', |
| 'host': '0.0.0.0', |
| 'port': service_port, |
| 'saslMechanisms': 'ANONYMOUS'}), |
| |
| ('linkRoute', {'prefix': 'RoutieMcRouteFace', |
| 'containerId': 'FakeService', |
| 'direction': 'in'}), |
| ('linkRoute', {'prefix': 'RoutieMcRouteFace', |
| 'containerId': 'FakeService', |
| 'direction': 'out'}), |
| ]) |
| |
| cls.RouterA.wait_router_connected('RouterB') |
| cls.RouterB.wait_router_connected('RouterA') |
| |
| def test_01_link_route(self): |
| """ |
| Verify non-terminal state and data propagates over a link route |
| """ |
| class MyExtendedService(FakeService): |
| """ |
| This service saves any outcome and extension data that arrives in a |
| transfer |
| """ |
| |
| def __init__(self, url, container_id=None): |
| self.remote_state = None |
| self.remote_data = None |
| super(MyExtendedService, self).__init__(url, container_id) |
| |
| def on_message(self, event): |
| self.remote_state = event.delivery.remote_state |
| self.remote_data = event.delivery.remote.data |
| super(MyExtendedService, self).on_message(event) |
| |
| fs = MyExtendedService(self.RouterB.addresses[1], |
| container_id="FakeService") |
| self.RouterA.wait_address("RoutieMcRouteFace", remotes=1, count=2) |
| |
| tx = MyExtendedSender(self.RouterA.addresses[0], |
| "RoutieMcRouteFace") |
| tx.wait() |
| fs.join() |
| self.assertEqual(999, fs.remote_state) |
| self.assertEqual([1, 2, 3], fs.remote_data) |
| |
| def test_02_closest(self): |
| """ |
| Verify non-terminal state and data propagates over anycase |
| """ |
| test = ExtensionStateTester(self.RouterA.addresses[0], |
| self.RouterB.addresses[0], |
| "closest/fleabag") |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_03_multicast(self): |
| """ |
| Verify that disposition state set by the publisher is available to all |
| consumers |
| """ |
| rxs = [MyExtendedReceiver(self.RouterA.addresses[0], |
| "multicast/thingy") |
| for x in range(3)] |
| self.RouterA.wait_address("multicast/thingy", subscribers=3) |
| sleep(0.5) # let subscribers grant credit |
| tx = MyExtendedSender(self.RouterB.addresses[0], |
| "multicast/thingy") |
| tx.wait() |
| |
| # DISPATCH-1705: only one of the receivers gets the data, but all |
| # should get the state |
| |
| ext_data = None |
| for rx in rxs: |
| rx.stop() |
| try: |
| while True: |
| dispo = rx.remote_states.pop() |
| self.assertEqual(999, dispo[0]) |
| ext_data = dispo[1] or ext_data |
| except IndexError: |
| pass |
| self.assertEqual([1, 2, 3], ext_data) |
| |
| def test_04_test_transactional_state(self): |
| """ |
| Verifies that the data sent in the state field of the disposition |
| is forwarded all the way back to the client. |
| """ |
| TRANS_STATE = 52 |
| RESPONSE_LOCAL_DATA = ["MyTxnIDResp", |
| Described(ulong(Delivery.ACCEPTED), [])] |
| |
| class MyExtendedService(FakeService): |
| """ |
| This service receives a transfer frame and sends back a |
| disposition frame with a state field. |
| For example, this service sends a disposition with the |
| following state field |
| state=@transactional-state(52) [txn-id="MyTxnIDResp", outcome=@accepted(36) []] |
| """ |
| def __init__(self, url, container_id=None): |
| self.remote_state = None |
| self.remote_data = None |
| super(MyExtendedService, self).__init__(url, container_id, |
| auto_accept=False, |
| auto_settle=False) |
| |
| def on_message(self, event): |
| self.remote_state = event.delivery.remote_state |
| self.remote_data = event.delivery.remote.data |
| if self.remote_state == TRANS_STATE and self.remote_data == ['MyTxnID']: |
| # This will send a disposition with |
| # state=@transactional-state(52) [txn-id="MyTxnIDResp", outcome=@accepted(36) []] |
| # We will make sure that this state was received |
| # by the sender. |
| event.delivery.local.data = RESPONSE_LOCAL_DATA |
| event.delivery.update(TRANS_STATE) |
| event.delivery.settle() |
| |
| # Start the service that connects to the route-container listener |
| # on the router with container_id="FakeService" |
| fs = MyExtendedService(self.RouterB.addresses[1], |
| container_id="FakeService") |
| |
| self.RouterA.wait_address("RoutieMcRouteFace", remotes=1, count=2) |
| |
| class MyTransactionStateSender(AsyncTestSender): |
| def on_sendable(self, event): |
| # Send just one delivery with a transactional state. |
| if self.sent < self.total: |
| self.sent += 1 |
| dlv = event.sender.delivery(str(self.sent)) |
| dlv.local.data = ["MyTxnID"] |
| # this will send a transfer frame to the router with |
| # state=@transactional-state(52) [txn-id="MyTxnID"] |
| dlv.update(TRANS_STATE) |
| event.sender.stream(self._message.encode()) |
| event.sender.advance() |
| |
| def on_settled(self, event): |
| self.remote_state = event.delivery.remote_state |
| self.remote_data = event.delivery.remote.data |
| if self.remote_state == TRANS_STATE and \ |
| self.remote_data == RESPONSE_LOCAL_DATA: |
| # This means that the router is passing the state it |
| # received from the service all the way back to the |
| # client. This would not happen without the fix |
| # for DISPATCH-2040 |
| self.accepted += 1 |
| self.test_passed = True |
| |
| tx = MyTransactionStateSender(self.RouterA.addresses[0], "RoutieMcRouteFace") |
| tx.wait() |
| fs.join() |
| self.assertTrue(tx.test_passed) |
| |
| |
| class MyExtendedSender(AsyncTestSender): |
| """ |
| This sender sets a non-terminal outcome and data on the outgoing |
| transfer |
| """ |
| |
| def on_sendable(self, event): |
| if self.sent < self.total: |
| dlv = event.sender.delivery(str(self.sent)) |
| dlv.local.data = [1, 2, 3] |
| dlv.update(999) |
| event.sender.stream(self._message.encode()) |
| event.sender.advance() |
| self.sent += 1 |
| |
| |
| class MyExtendedReceiver(AsyncTestReceiver): |
| """ |
| This receiver stores any remote delivery state that arrives with a message |
| transfer |
| """ |
| |
| def __init__(self, *args, **kwargs): |
| self.remote_states = [] |
| super(MyExtendedReceiver, self).__init__(*args, **kwargs) |
| |
| def on_message(self, event): |
| self.remote_states.append((event.delivery.remote_state, |
| event.delivery.remote.data)) |
| super(MyExtendedReceiver, self).on_message(event) |
| |
| |
| class ExtensionStateTester(MessagingHandler): |
| """ |
| Verify the routers propagate non-terminal outcome and extended state |
| disposition information in both message transfer and disposition frames. |
| |
| This tester creates a receiver and a sender link to a given address. |
| |
| The sender transfers a message with a non-terminal delivery state and |
| associated extension data. The receiver expects to find this state in the |
| incoming delivery. |
| |
| The receiver then responds with a non-terminal disposition that also has |
| extension state data. The sender expects to find this new state associated |
| with its delivery. |
| """ |
| |
| def __init__(self, ingress_router, egress_router, address): |
| super(ExtensionStateTester, self).__init__(auto_settle=False, |
| auto_accept=False) |
| self._in_router = ingress_router |
| self._out_router = egress_router |
| self._address = address |
| self._sender_conn = None |
| self._recvr_conn = None |
| self._sender = None |
| self._receiver = None |
| self._sent = 0 |
| self._received = 0 |
| self._settled = 0 |
| self._total = 10 |
| self._message = Message(body="XYZ" * (1024 * 1024 * 2)) |
| self.error = None |
| |
| def on_start(self, event): |
| self._reactor = event.reactor |
| self._sender_conn = event.container.connect(self._in_router) |
| self._sender = event.container.create_sender(self._sender_conn, |
| target=self._address, |
| name="ExtensionSender") |
| self._recvr_conn = event.container.connect(self._out_router) |
| self._receiver = event.container.create_receiver(self._recvr_conn, |
| source=self._address, |
| name="ExtensionReceiver") |
| |
| def _done(self, error=None): |
| self.error = error or self.error |
| self._sender.close() |
| self._sender_conn.close() |
| self._receiver.close() |
| self._recvr_conn.close() |
| |
| def on_sendable(self, event): |
| if self._sent < self._total: |
| self._sent += 1 |
| dlv = event.sender.delivery(str(self._sent)) |
| dlv.local.data = [1, 2, 3, self._sent] |
| dlv.update(666) # non-terminal state |
| self._message.id = self._sent |
| event.sender.stream(self._message.encode()) |
| event.sender.advance() |
| |
| def on_message(self, event): |
| dlv = event.delivery |
| msg_id = event.message.id |
| if dlv.remote_state != 666: |
| return self._done(error="Unexpected outcome '%s', expected '666'" |
| % dlv.remote_state) |
| remote_data = dlv.remote.data |
| expected_data = [1, 2, 3, msg_id] |
| if remote_data != expected_data: |
| return self._done(error="Unexpected dispo data '%s', expected '%s'" |
| % (remote_data, expected_data)) |
| |
| # send back a non-terminal outcome and more data |
| dlv.local.data = [10, 9, 8, msg_id] |
| dlv.update(777) |
| self._received += 1 |
| |
| def _handle_sender_update(self, event): |
| dlv = event.delivery |
| if dlv.local_state != 666 or len(dlv.local.data) != 4: |
| return self._done(error="Unexpected local state at sender: %s %s" % |
| (dlv.local_state, dlv.local.data)) |
| |
| if dlv.remote_state != 777 or len(dlv.remote.data) != 4: |
| return self._done(error="Unexpected remote state at sender: %s %s" % |
| (dlv.remote_state, dlv.remote.data)) |
| dlv.settle() |
| |
| def _handle_receiver_update(self, event): |
| dlv = event.delivery |
| if dlv.settled: |
| if dlv.local_state != 777 or len(dlv.local.data) != 4: |
| return self._done(error="Unexpected local state at sender: %s %s" % |
| (dlv.local_state, dlv.local.data)) |
| |
| if dlv.remote_state != 666 or len(dlv.remote.data) != 4: |
| return self._done(error="Unexpected remote state at sender: %s %s" % |
| (dlv.remote_state, dlv.remote.data)) |
| dlv.settle() |
| self._settled += 1 |
| if self._settled == self._total: |
| self._done() |
| |
| def on_delivery(self, event): |
| if event.delivery.link.is_sender: |
| self._handle_sender_update(event) |
| else: |
| self._handle_receiver_update(event) |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |