| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from time import sleep |
| |
| from proton import Condition, Message, Delivery |
| from system_test import TestCase, Qdrouterd, TIMEOUT, get_link_info, \ |
| get_inter_router_links, has_mobile_dest_in_address_table, PollTimeout, TestTimeout |
| from proton.handlers import MessagingHandler |
| from proton.reactor import Container |
| from qpid_dispatch.management.client import Node |
| |
| LARGE_PAYLOAD = ("X" * 1024) * 30 |
| |
| |
| _LINK_STATISTIC_KEYS = set(['unsettledCount', |
| 'undeliveredCount', |
| 'releasedCount', |
| 'presettledCount', |
| 'acceptedCount', |
| 'droppedPresettledCount', |
| 'rejectedCount', |
| 'deliveryCount', |
| 'modifiedCount']) |
| |
| def get_body(n_sent, large_message=False): |
| if large_message: |
| body = {'number': n_sent, 'msg': LARGE_PAYLOAD} |
| else: |
| body = {'number': n_sent} |
| |
| def _link_stats_are_zero(statistics, keys): |
| """ |
| Verify that all statistics whose keys are present are zero |
| """ |
| for key in keys: |
| if statistics.get(key) != 0: |
| return False |
| return True; |
| |
| |
| class OneRouterModifiedTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| """Start three routers""" |
| super(OneRouterModifiedTest, cls).setUpClass() |
| |
| listen_port = cls.tester.get_port() |
| config = Qdrouterd.Config([ |
| ('router', {'mode': 'standalone', 'id': 'A'}), |
| ('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})]) |
| |
| cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True) |
| |
| def router_modified_counts(self, large_message=False): |
| address = self.router.addresses[0] |
| |
| local_node = Node.connect(address, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| deliveries_modified_index = outs.attribute_names.index('modifiedDeliveries') |
| results = outs.results[0] |
| num_modified_deliveries_pre_test = results[deliveries_modified_index] |
| |
| num_messages = 10 |
| test = ModifiedDeliveriesTest(address, num_messages, large_message) |
| test.run() |
| |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| results = outs.results[0] |
| |
| self.assertEqual(results[deliveries_modified_index] - num_modified_deliveries_pre_test, num_messages) |
| |
| # check link statistics |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'modifiedCount']))) |
| self.assertEqual(test.sender_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.sender_stats['modifiedCount'], num_messages) |
| |
| # receiver just drops the link, so these are not counted as modified |
| # but unsettled instead |
| self.assertEqual(test.receiver_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.receiver_stats['unsettledCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'unsettledCount']))) |
| |
| def test_one_router_modified_counts(self): |
| self.router_modified_counts() |
| |
| def test_one_router_large_message_modified_counts(self): |
| self.router_modified_counts(True) |
| |
| class OneRouterRejectedTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| """Start three routers""" |
| super(OneRouterRejectedTest, cls).setUpClass() |
| |
| listen_port = cls.tester.get_port() |
| config = Qdrouterd.Config([ |
| ('router', {'mode': 'standalone', 'id': 'A'}), |
| ('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})]) |
| |
| cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True) |
| |
| def one_router_rejected_counts(self, large_message=False): |
| address = self.router.addresses[0] |
| |
| local_node = Node.connect(address, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| deliveries_rejected_index = outs.attribute_names.index('rejectedDeliveries') |
| results = outs.results[0] |
| deliveries_rejected_pre_test = results[deliveries_rejected_index] |
| |
| num_messages = 10 |
| test = RejectedDeliveriesTest(address, num_messages, large_message) |
| test.run() |
| |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| results = outs.results[0] |
| |
| self.assertEqual(results[deliveries_rejected_index] - deliveries_rejected_pre_test, num_messages) |
| |
| # check link statistics |
| self.assertEqual(test.sender_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.sender_stats['rejectedCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'rejectedCount']))) |
| |
| self.assertEqual(test.receiver_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.receiver_stats['rejectedCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'rejectedCount']))) |
| |
| def test_one_router_rejected_counts(self): |
| self.one_router_rejected_counts() |
| |
| def test_one_router_large_message_rejected_counts(self): |
| self.one_router_rejected_counts(True) |
| |
| |
| class OneRouterReleasedDroppedPresettledTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| """Start three routers""" |
| super(OneRouterReleasedDroppedPresettledTest, cls).setUpClass() |
| |
| listen_port = cls.tester.get_port() |
| config = Qdrouterd.Config([ |
| ('router', {'mode': 'standalone', 'id': 'A'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| ('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})]) |
| |
| cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True) |
| |
| def one_router_released_dropped_count(self, large_message=False): |
| address = self.router.addresses[0] |
| |
| local_node = Node.connect(address, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| |
| deliveries_dropped_presettled_index = outs.attribute_names.index('droppedPresettledDeliveries') |
| deliveries_released_index = outs.attribute_names.index('releasedDeliveries') |
| deliveries_presettled_index = outs.attribute_names.index('presettledDeliveries') |
| results = outs.results[0] |
| |
| deliveries_dropped_presettled_pre_test = results[deliveries_dropped_presettled_index] |
| deliveries_released_pre_test = results[deliveries_released_index] |
| deliveries_presettled_pre_test = results[deliveries_presettled_index] |
| num_messages = 20 |
| test = ReleasedDroppedPresettledCountTest(address, num_messages, large_message) |
| test.run() |
| |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| |
| results = outs.results[0] |
| |
| self.assertEqual(results[deliveries_dropped_presettled_index] - deliveries_dropped_presettled_pre_test, 10) |
| self.assertEqual(results[deliveries_released_index] - deliveries_released_pre_test, 10) |
| self.assertEqual(results[deliveries_presettled_index] - deliveries_presettled_pre_test, 10) |
| |
| # check link statistics |
| self.assertEqual(test.sender_stats['deliveryCount'], test.n_sent) |
| self.assertEqual(test.sender_stats['releasedCount'], 10) |
| self.assertEqual(test.sender_stats['presettledCount'], 10) |
| self.assertEqual(test.sender_stats['droppedPresettledCount'], 10) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'releasedCount', |
| 'presettledCount', |
| 'droppedPresettledCount']))) |
| def test_one_router_released_dropped_counts(self): |
| self.one_router_released_dropped_count() |
| |
| def test_one_router_large_message_released_dropped_counts(self): |
| self.one_router_released_dropped_count(True) |
| |
| class TwoRouterReleasedDroppedPresettledTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| super(TwoRouterReleasedDroppedPresettledTest, cls).setUpClass() |
| |
| listen_port_1 = cls.tester.get_port() |
| listen_port_2 = cls.tester.get_port() |
| listen_port_inter_router = cls.tester.get_port() |
| |
| config_1 = Qdrouterd.Config([ |
| ('router', {'mode': 'interior', 'id': 'A'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| ('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ]) |
| |
| config_2 = Qdrouterd.Config([ |
| ('router', {'mode': 'interior', 'id': 'B'}), |
| ('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router}), |
| ]) |
| |
| cls.routers = [] |
| cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True)) |
| cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True)) |
| cls.routers[1].wait_router_connected('A') |
| |
| def two_router_released_dropped_counts(self, large_message=False): |
| address = self.routers[0].addresses[0] |
| |
| # Send presettled and settled messages to router 1. |
| # Make sure the hello messages (which are presettled dont show up in the counts |
| |
| local_node = Node.connect(address, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| deliveries_dropped_presettled_index = outs.attribute_names.index('droppedPresettledDeliveries') |
| deliveries_released_index = outs.attribute_names.index('releasedDeliveries') |
| deliveries_presettled_index = outs.attribute_names.index('presettledDeliveries') |
| results = outs.results[0] |
| deliveries_dropped_presettled_pre_test = results[deliveries_dropped_presettled_index] |
| deliveries_released_pre_test = results[deliveries_released_index] |
| deliveries_presettled_pre_test = results[deliveries_presettled_index] |
| num_messages = 20 |
| test = ReleasedDroppedPresettledCountTest(address, num_messages, large_message) |
| test.run() |
| |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| results = outs.results[0] |
| |
| self.assertEqual(results[deliveries_dropped_presettled_index] - deliveries_dropped_presettled_pre_test, 10) |
| self.assertEqual(results[deliveries_released_index] - deliveries_released_pre_test, 10) |
| self.assertEqual(results[deliveries_presettled_index] - deliveries_presettled_pre_test, 10) |
| |
| # check link statistics |
| self.assertEqual(test.sender_stats['deliveryCount'], test.n_sent) |
| self.assertEqual(test.sender_stats['releasedCount'], 10) |
| self.assertEqual(test.sender_stats['presettledCount'], 10) |
| self.assertEqual(test.sender_stats['droppedPresettledCount'], 10) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'releasedCount', |
| 'presettledCount', |
| 'droppedPresettledCount']))) |
| |
| def test_two_router_released_dropped_counts(self): |
| self.two_router_released_dropped_counts() |
| |
| def test_two_router_large_message_released_dropped_counts(self): |
| self.two_router_released_dropped_counts(True) |
| |
| |
| class AddressCheckerTimeout ( object ): |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def on_timer_task(self, event): |
| self.parent.address_check_timeout() |
| |
| |
| class CounterCheckerTimeout ( object ): |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def on_timer_task(self, event): |
| self.parent.count_check_timeout() |
| |
| class LargePresettledLinkCounterTest(MessagingHandler): |
| def __init__(self, sender_addr, receiver_addr): |
| super(LargePresettledLinkCounterTest, self).__init__() |
| self.timer = None |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.receiver = None |
| self.error = None |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.num_messages = 25 |
| self.sender_addr = sender_addr |
| self.receiver_addr = receiver_addr |
| self.dest = "LargePresettledLinkCounterTest" |
| self.links = None |
| self.success = False |
| self.address_check_timer = None |
| self.container = None |
| self.num_attempts = 0 |
| self.reactor = None |
| self.done = False |
| |
| def check_if_done(self): |
| if self.done: |
| # Step 5: All messages have been received by receiver. |
| # Check the presettled count on the inter-router link of |
| # Router B (where the receiver is attached). |
| self.links = get_inter_router_links(self.receiver_addr) |
| for link in self.links: |
| # The self.num_messages + 1 is because before this test started the presettledCount was 1 |
| if link.get("linkDir") == "in" and link.get("presettledCount") == self.num_messages + 1: |
| self.success = True |
| break |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.timer.cancel() |
| |
| def address_check_timeout(self): |
| if has_mobile_dest_in_address_table(self.sender_addr, self.dest): |
| # Step 3: The address has propagated to Router A. Now attach a sender |
| # to router A. |
| self.sender_conn = self.container.connect(self.sender_addr) |
| self.sender = self.container.create_sender(self.sender_conn, |
| self.dest, |
| name='SenderA') |
| else: |
| if self.num_attempts < 2: |
| self.address_check_timer = self.reactor.schedule(2, |
| AddressCheckerTimeout(self)) |
| self.num_attempts += 1 |
| |
| def timeout(self): |
| self.error = "Timeout Expired: self.n_sent=%d, self.self.n_received=%d " % (self.n_sent, self.n_received) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def on_start(self, event): |
| self.container = event.container |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| # Step 1: Create a receiver with name ReceiverA to address LargePresettledLinkCounterTest |
| # This receiver is attached to router B. Later a sender will be |
| # created which will be connected to Router A. The sender will send |
| # on the same address that the receiver is receiving on. |
| self.receiver_conn = event.container.connect(self.receiver_addr) |
| self.receiver = event.container.create_receiver(self.receiver_conn, |
| self.dest, |
| name='ReceiverA') |
| |
| def on_link_opened(self, event): |
| self.reactor = event.reactor |
| if event.receiver: |
| # Step 2: The receiver link has been opened. |
| # Give 2 seconds for the address to propagate to the other router (Router A) |
| self.address_check_timer = event.reactor.schedule(2, AddressCheckerTimeout(self)) |
| self.num_attempts += 1 |
| |
| def on_sendable(self, event): |
| # Step 4: Send self.num_messages multi-frame large pre-settled messages. |
| # These messages will travel over inter-router link to Router B. |
| if self.n_sent < self.num_messages: |
| msg = Message(body=LARGE_PAYLOAD) |
| dlv = self.sender.send(msg) |
| # We are sending a pre-settled large multi frame message. |
| dlv.settle() |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if self.receiver == event.receiver: |
| self.n_received += 1 |
| if self.n_received == self.num_messages: |
| self.done = True |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class LargePresettledReleasedLinkCounterTest(MessagingHandler): |
| def __init__(self, sender_addr, receiver_addr): |
| super(LargePresettledReleasedLinkCounterTest, self).__init__(prefetch=0) |
| self.sender_addr = sender_addr |
| self.receiver_addr = receiver_addr |
| self.dest = "LargePresettledReleasedLinkCounterTest" |
| self.receiver_dropoff_count = 50 |
| self.num_messages = 200 |
| self.num_attempts = 0 |
| self.n_sent= 0 |
| self.done = False |
| self.n_received = 0 |
| self.count_check_timer = None |
| self.success = False |
| self.links = None |
| self.receiver_conn_closed = False |
| |
| def check_if_done(self): |
| # Step 6: |
| # Check the counts on the inter-router link of |
| # Router B (where the receiver is attached). There |
| # should be no released or modified messages. |
| self.links = get_inter_router_links(self.receiver_addr) |
| for link in self.links: |
| # We don't know how many deliveries got from one side of the |
| # inter-router link to the other but there should at least be as |
| # many as was sent to the receiver |
| if link.get("linkDir") == "in" \ |
| and link.get("presettledCount") > self.receiver_dropoff_count \ |
| and link.get("deliveryCount") > self.receiver_dropoff_count \ |
| and link.get("releasedCount") == 0\ |
| and link.get("modifiedCount") == 0: |
| self.success = True |
| break |
| self.sender_conn.close() |
| self.timer.cancel() |
| |
| def count_check_timeout(self): |
| self.check_if_done() |
| |
| def address_check_timeout(self): |
| if has_mobile_dest_in_address_table(self.sender_addr, self.dest): |
| # Step 3: The address has propagated to Router A. Now attach a sender |
| # to router A. |
| self.sender_conn = self.container.connect(self.sender_addr) |
| self.sender = self.container.create_sender(self.sender_conn, |
| self.dest, |
| name='SenderA') |
| else: |
| if self.num_attempts < 2: |
| self.address_check_timer = self.reactor.schedule(2, AddressCheckerTimeout(self)) |
| self.num_attempts += 1 |
| |
| def timeout(self): |
| self.error = "Timeout Expired: self.n_sent=%d, self.self.n_received=%d " % (self.n_sent, self.n_received) |
| self.sender_conn.close() |
| if not self.receiver_conn_closed: |
| self.receiver_conn.close() |
| |
| def on_start(self, event): |
| self.container = event.container |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| # Step 1: Create a receiver with name ReceiverA to address LargePresettledReleasedLinkCounterTest |
| # This receiver is attached to router B. Later a sender will be |
| # created which will be connected to Router A. The sender will send |
| # on the same address that the receiver is receiving on. |
| self.receiver_conn = event.container.connect(self.receiver_addr) |
| self.receiver = event.container.create_receiver(self.receiver_conn, |
| self.dest, |
| name='ReceiverA') |
| self.receiver.flow(self.receiver_dropoff_count) |
| |
| def on_link_opened(self, event): |
| self.reactor = event.reactor |
| if event.receiver: |
| # Step 2: The receiver link has been opened. |
| # Give 2 seconds for the address to propagate to the other router (Router A) |
| self.address_check_timer = event.reactor.schedule(2, AddressCheckerTimeout(self)) |
| self.num_attempts += 1 |
| |
| def on_sendable(self, event): |
| # Step 4: Send self.num_messages multi-frame large pre-settled messages. |
| # These messages will travel over inter-router link to Router B. |
| while self.n_sent < self.num_messages: |
| msg = Message(body=LARGE_PAYLOAD) |
| dlv = self.sender.send(msg) |
| # We are sending a pre-settled large multi frame message. |
| dlv.settle() |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if self.receiver == event.receiver and not self.done: |
| self.n_received += 1 |
| # Step 5: The receiver receives only 50 messages out of the 200 |
| # messages and drops out. |
| if self.n_received == self.receiver_dropoff_count: |
| self.done = True |
| self.receiver_conn.close() |
| self.receiver_conn_closed = True |
| self.count_check_timer = event.reactor.schedule(3, CounterCheckerTimeout(self)) |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class TwoRouterLargeMessagePresettledCountTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| super(TwoRouterLargeMessagePresettledCountTest, cls).setUpClass() |
| |
| listen_port_1 = cls.tester.get_port() |
| listen_port_2 = cls.tester.get_port() |
| listen_port_inter_router = cls.tester.get_port() |
| |
| config_1 = Qdrouterd.Config([ |
| ('router', {'mode': 'interior', 'id': 'A'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| ('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ]) |
| |
| config_2 = Qdrouterd.Config([ |
| ('router', {'mode': 'interior', 'id': 'B'}), |
| ('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router}), |
| ]) |
| |
| cls.routers = [] |
| cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True)) |
| cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True)) |
| cls.routers[1].wait_router_connected('A') |
| |
| def test_verify_inter_router_presettled_count_DISPATCH_1540(self): |
| sender_address = self.routers[0].addresses[0] |
| receiver_address = self.routers[1].addresses[0] |
| # Sends presettled large messages across routers and checks |
| # the pre-settled count on the inter-router link of the downstream |
| # router (i.e. that to which receiver is attached) |
| # This test will fail if DISPATCH-1540 is not fixed since the |
| # pre-settled count will show zero |
| test = LargePresettledLinkCounterTest(sender_address, receiver_address) |
| test.run() |
| self.assertTrue(test.success) |
| |
| |
| class TwoRouterLargeMessagePresettledReleasedCountTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| super(TwoRouterLargeMessagePresettledReleasedCountTest, cls).setUpClass() |
| |
| listen_port_1 = cls.tester.get_port() |
| listen_port_2 = cls.tester.get_port() |
| listen_port_inter_router = cls.tester.get_port() |
| |
| config_1 = Qdrouterd.Config([ |
| ('router', {'mode': 'interior', 'id': 'A'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| ('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ]) |
| |
| config_2 = Qdrouterd.Config([ |
| ('router', {'mode': 'interior', 'id': 'B'}), |
| ('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router}), |
| ]) |
| |
| cls.routers = [] |
| cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True)) |
| cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True)) |
| cls.routers[1].wait_router_connected('A') |
| |
| def test_verify_inter_router_presettled_released_count_DISPATCH_1541(self): |
| # This test sends presettled large messages across routers. A sender is on |
| # router A and a receiver on B. The sender sends 200 messages, the receiver |
| # receives 50 messages and goes away by closing its connection. There should be no released or |
| # modified messages on the incoming inter-router link on Router B |
| # This test will fail without the patch to DISPATCH-1541 |
| sender_address = self.routers[0].addresses[0] |
| receiver_address = self.routers[1].addresses[0] |
| test = LargePresettledReleasedLinkCounterTest(sender_address, receiver_address) |
| test.run() |
| self.assertTrue(test.success) |
| |
| |
| |
| class LinkRouteIngressEgressTransitTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| """Start three routers""" |
| super(LinkRouteIngressEgressTransitTest, cls).setUpClass() |
| |
| def router(name, connection): |
| |
| config = [ |
| ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}), |
| ] + connection |
| |
| config = Qdrouterd.Config(config) |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=False)) |
| |
| cls.routers = [] |
| a_listener_port = cls.tester.get_port() |
| b_listener_port = cls.tester.get_port() |
| c_listener_port = cls.tester.get_port() |
| test_tag_listener_port = cls.tester.get_port() |
| |
| router('A', |
| [ |
| ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| ]) |
| router('B', |
| [ |
| ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| ('listener', {'name': 'test-tag', 'role': 'route-container', 'host': '0.0.0.0', 'port': test_tag_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| |
| ('connector', {'name': 'broker', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| ('connector', {'name': 'routerC', 'role': 'inter-router', 'host': '0.0.0.0', 'port': c_listener_port}), |
| |
| |
| ('linkRoute', {'prefix': 'pulp.task', 'connection': 'broker', 'direction': 'in'}), |
| ('linkRoute', {'prefix': 'pulp.task', 'connection': 'broker', 'direction': 'out'}), |
| |
| ] |
| ) |
| router('C', |
| [ |
| ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}), |
| ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': c_listener_port, 'saslMechanisms': 'ANONYMOUS'}), |
| |
| ('linkRoute', {'prefix': 'pulp.task', 'direction': 'in'}), |
| ('linkRoute', {'prefix': 'pulp.task', 'direction': 'out'}), |
| ] |
| ) |
| |
| # Wait for the routers to locate each other, and for route propagation |
| # to settle |
| cls.routers[2].wait_router_connected('QDR.B') |
| cls.routers[1].wait_router_connected('QDR.C') |
| cls.routers[2].wait_address("pulp.task", remotes=1, delay=3, count=2) |
| |
| # This is not a classic router network in the sense that QDR.A and D are acting as brokers. We allow a little |
| # bit more time for the routers to stabilize. |
| sleep(2) |
| |
| def link_route_ingress_egress_transit_counts(self, large_message=False): |
| address1 = self.routers[2].addresses[0] |
| address2 = self.routers[2].addresses[0] |
| |
| local_node = Node.connect(address1, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| |
| deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress') |
| deliveries_egress_index = outs.attribute_names.index('deliveriesEgress') |
| deliveries_transit_index = outs.attribute_names.index('deliveriesTransit') |
| |
| results = outs.results[0] |
| |
| pre_ingress_count = results[deliveries_ingress_index] |
| pre_egress_count = results[deliveries_egress_index] |
| pre_transit_count = results[deliveries_transit_index] |
| |
| num_messages = 10 |
| # Send and receive on the same router, router C |
| test = IngressEgressTransitLinkRouteTest(address1, address2, num_messages, large_message=large_message) |
| test.run() |
| local_node = Node.connect(address1, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| |
| results = outs.results[0] |
| |
| post_ingress_count = results[deliveries_ingress_index] |
| post_egress_count = results[deliveries_egress_index] |
| post_transit_count = results[deliveries_transit_index] |
| |
| # 10 messages entered the router, and 10 messages were echoed by router A and 3 mgmt requests |
| self.assertEqual(post_ingress_count - pre_ingress_count, 23) |
| |
| # 10 messages + 3 mgmt request |
| self.assertEqual(post_egress_count - pre_egress_count, 13) |
| |
| # 10 messages went out this router |
| self.assertEqual(post_transit_count - pre_transit_count, 10) |
| |
| # Check link statistics |
| self.assertEqual(test.sender_stats['deliveryCount'], 10) |
| self.assertEqual(test.sender_stats['acceptedCount'], 10) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| self.assertEqual(test.receiver_stats['deliveryCount'], 10) |
| self.assertEqual(test.receiver_stats['acceptedCount'], 10) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| def test_link_route_ingress_egress_transit_counts(self): |
| self.link_route_ingress_egress_transit_counts() |
| |
| def test_link_route_large_message_ingress_egress_transit_counts(self): |
| self.link_route_ingress_egress_transit_counts(True) |
| |
| class TwoRouterIngressEgressTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| super(TwoRouterIngressEgressTest, cls).setUpClass() |
| |
| listen_port_1 = cls.tester.get_port() |
| listen_port_2 = cls.tester.get_port() |
| listen_port_inter_router = cls.tester.get_port() |
| |
| config_1 = Qdrouterd.Config([ |
| ('router', {'mode': 'interior', 'id': 'A'}), |
| ('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ]) |
| |
| config_2 = Qdrouterd.Config([ |
| ('router', {'mode': 'interior', 'id': 'B'}), |
| ('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), |
| ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router}), |
| ]) |
| |
| cls.routers = [] |
| cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True)) |
| cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True)) |
| cls.routers[1].wait_router_connected('A') |
| |
| def two_router_ingress_egress_counts(self, large_message=False): |
| in_router_addr = self.routers[0].addresses[0] |
| out_router_addr = self.routers[1].addresses[0] |
| |
| # Gather the values for deliveries_ingress and deliveries_egress before running the test. |
| |
| local_node = Node.connect(in_router_addr, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress') |
| results = outs.results[0] |
| |
| pre_deliveries_ingresss = results[deliveries_ingress_index] |
| |
| local_node = Node.connect(out_router_addr, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| deliveries_egress_index = outs.attribute_names.index('deliveriesEgress') |
| deliveries_accepted_index = outs.attribute_names.index('acceptedDeliveries') |
| results = outs.results[0] |
| |
| pre_deliveries_egress = results[deliveries_egress_index] |
| pre_deliveries_accepted = results[deliveries_accepted_index] |
| |
| # Now run the test. At the end of the test each router will be queried |
| # for the per-link stats |
| num_messages = 10 |
| test = IngressEgressTwoRouterTest(in_router_addr, out_router_addr, num_messages, large_message=large_message) |
| test.run() |
| |
| # Gather the values for deliveries_ingress and deliveries_egress after running the test. |
| local_node = Node.connect(in_router_addr, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| results = outs.results[0] |
| |
| post_deliveries_ingresss = results[deliveries_ingress_index] |
| |
| local_node = Node.connect(out_router_addr, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| results = outs.results[0] |
| |
| post_deliveries_egress = results[deliveries_egress_index] |
| post_deliveries_accepted = results[deliveries_accepted_index] |
| |
| accepted_deliveries_diff = post_deliveries_accepted - pre_deliveries_accepted |
| |
| # 12 = 10 msgs + 2 mgmt requests |
| self.assertEqual(post_deliveries_ingresss - pre_deliveries_ingresss, 12) |
| self.assertEqual(post_deliveries_egress - pre_deliveries_egress, 12) |
| |
| # check the link statistics |
| self.assertEqual(test.sender_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.sender_stats['acceptedCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| |
| self.assertEqual(test.receiver_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.receiver_stats['acceptedCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| # The management requests are counted in the acceptedDeliveries, so it is difficult to measure the |
| # exact number of accepted deliveries at this point in time. But it must at least be 10 since |
| # we know for sure from the test that the 10 dispositions related to the 10 sent messages |
| # were definitely received |
| self.assertTrue(accepted_deliveries_diff >= num_messages) |
| |
| def test_two_router_ingress_egress_counts(self): |
| self.two_router_ingress_egress_counts() |
| |
| def test_two_router_large_message_ingress_egress_counts(self): |
| self.two_router_ingress_egress_counts(True) |
| |
| class OneRouterIngressEgressTest(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| """Start a router and a messenger""" |
| super(OneRouterIngressEgressTest, cls).setUpClass() |
| |
| listen_port = cls.tester.get_port() |
| config = Qdrouterd.Config([ |
| ('router', {'mode': 'standalone', 'id': 'A'}), |
| ('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})]) |
| |
| cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True) |
| |
| def one_router_ingress_egress_counts(self, large_message=False): |
| address = self.router.addresses[0] |
| |
| local_node = Node.connect(address, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| |
| deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress') |
| deliveries_egress_index = outs.attribute_names.index('deliveriesEgress') |
| results = outs.results[0] |
| deliveries_ingress_pre_test = results[deliveries_ingress_index] |
| deliveries_egress_pre_test = results[deliveries_egress_index] |
| |
| num_messages = 10 |
| test = IngressEgressOneRouterTest(address, num_messages, large_message=large_message) |
| test.run() |
| |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| |
| results = outs.results[0] |
| |
| # 13 = ten msgs + 3 mgmt requests |
| self.assertEqual(results[deliveries_ingress_index] - deliveries_ingress_pre_test, 13) |
| # 12 = ten msgs + 2 mgmt requests |
| self.assertEqual(results[deliveries_egress_index] - deliveries_egress_pre_test, 13) |
| |
| # check the link statistics |
| self.assertEqual(test.sender_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.sender_stats['acceptedCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| self.assertEqual(test.receiver_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.receiver_stats['acceptedCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| def test_one_router_ingress_egress_counts(self): |
| self.one_router_ingress_egress_counts() |
| |
| def test_one_router_large_message_ingress_egress_counts(self): |
| self.one_router_ingress_egress_counts(True) |
| |
| |
| class RouteContainerEgressCount(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| super(RouteContainerEgressCount, cls).setUpClass() |
| |
| config = Qdrouterd.Config([ |
| ('router', {'mode': 'standalone', 'id': 'QDR'}), |
| |
| # |
| # Create a general-purpose listener for sending and receiving deliveries |
| # |
| ('listener', {'port': cls.tester.get_port()}), |
| |
| # Create a route-container listener and give it a name myListener. |
| # Later on we will create an autoLink which has a connection property of myListener. |
| # |
| ('listener', {'role': 'route-container', 'name': 'myListener', 'port': cls.tester.get_port()}), |
| # |
| # Note here that the connection is set to a previously declared 'myListener' |
| # |
| ('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'in'}), |
| ('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'out'}), |
| ]) |
| |
| cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True) |
| |
| def route_container_egress(self , large_message=False): |
| regular_addr = self.router.addresses[0] |
| route_container_addr = self.router.addresses[1] |
| num_messages = 10 |
| local_node = Node.connect(regular_addr, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| deliveries_egress_route_container_index = outs.attribute_names.index('deliveriesEgressRouteContainer') |
| results = outs.results[0] |
| |
| deliveries_egress_pre_test = results[deliveries_egress_route_container_index] |
| |
| test = RouteContainerEgressTest(route_container_addr, regular_addr, num_messages, large_message=large_message) |
| test.run() |
| |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| deliveries_egress_route_container_index = outs.attribute_names.index('deliveriesEgressRouteContainer') |
| |
| results = outs.results[0] |
| # 11 = 10 msgs + 1 mgmt msg |
| self.assertEqual(results[deliveries_egress_route_container_index] - deliveries_egress_pre_test, 11) |
| |
| # check link statistics |
| self.assertEqual(test.sender_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.sender_stats['acceptedCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| self.assertEqual(test.receiver_stats['deliveryCount'], num_messages) |
| self.assertEqual(test.receiver_stats['acceptedCount'], num_messages) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| def test_route_container_egress_count(self): |
| self.route_container_egress() |
| |
| def test_route_container_large_message_egress_count(self): |
| self.route_container_egress(True) |
| |
| |
| class OneRouterLinkCountersTest(TestCase): |
| """ |
| A set of tests that validate link-level counters |
| """ |
| CREDIT = 20 # default issued by test receiver client |
| COUNT = 40 # default total msgs the sender client generates |
| |
| @classmethod |
| def setUpClass(cls): |
| # create one router |
| super(OneRouterLinkCountersTest, cls).setUpClass() |
| |
| listen_port = cls.tester.get_port() |
| config = Qdrouterd.Config([ |
| ('router', {'mode': 'standalone', 'id': 'LinkCounters'}), |
| ('listener', {'port': listen_port, |
| 'authenticatePeer': False, |
| 'saslMechanisms': 'ANONYMOUS', |
| 'linkCapacity': cls.CREDIT})]) |
| |
| cls.router = cls.tester.qdrouterd(name="LinkCounters", config=config, wait=True) |
| |
| |
| class LinkCountersTest(MessagingHandler): |
| """ |
| Create 1 sender and 1 receiver to router_addr. Send count messages. |
| The test ends when the receivers deliveryCount reaches rx_limit. |
| Explicitly set the receiver credit and whether to sender sends |
| presettled or unsettled messages. |
| """ |
| def __init__(self, router_addr, count=None, rx_limit=None, |
| credit=None, presettled=False, outcome=None, |
| large_message=False): |
| super(OneRouterLinkCountersTest.LinkCountersTest, |
| self).__init__(auto_accept=False, |
| auto_settle=False, |
| prefetch=0) |
| self.router_addr = router_addr |
| self.presettled = presettled |
| self.outcome = outcome |
| self.count = OneRouterLinkCountersTest.COUNT \ |
| if count is None else count |
| self.credit = OneRouterLinkCountersTest.COUNT \ |
| if credit is None else credit |
| self.rx_limit = OneRouterLinkCountersTest.COUNT \ |
| if rx_limit is None else rx_limit |
| |
| self.sent = 0 |
| self.timer = 0 |
| self.poll_timer = None |
| self.conn = None |
| self.sender_stats = None |
| self.receiver_stats = None |
| self.large_message = large_message |
| |
| def timeout(self): |
| self._cleanup() |
| |
| def _cleanup(self): |
| if self.conn: |
| self.conn.close() |
| self.conn = None |
| if self.poll_timer: |
| self.poll_timer.cancel() |
| self.poll_timer = None |
| if self.timer: |
| self.timer.cancel() |
| self.timer = None |
| |
| def poll_timeout(self): |
| """ |
| Periodically check the deliveryCount on the receiver. Once it |
| reaches rx_limit the test is complete: gather link statistics |
| before closing the clients |
| """ |
| li = get_link_info("Rx_Test01", self.router_addr) |
| if li and li['deliveryCount'] == self.rx_limit: |
| self.receiver_stats = li |
| self.sender_stats = get_link_info("Tx_Test01", self.router_addr) |
| self._cleanup() |
| else: |
| self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self)) |
| |
| def on_start(self, event): |
| self.reactor = event.reactor |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self)) |
| self.conn = event.container.connect(self.router_addr) |
| self.receiver = event.container.create_receiver(self.conn, |
| source="Test01", |
| name='Rx_Test01') |
| self.receiver.flow(self.credit) |
| self.sender = event.container.create_sender(self.conn, |
| target="Test01", |
| name="Tx_Test01") |
| def on_sendable(self, event): |
| if self.sent < self.count: |
| if self.large_message: |
| dlv = self.sender.send(Message(body=LARGE_PAYLOAD)) |
| else: |
| dlv = self.sender.send(Message(body="Test01")) |
| if self.presettled: |
| dlv.settle() |
| self.sent += 1 |
| |
| def on_message(self, event): |
| if self.outcome: |
| event.delivery.update(self.outcome) |
| event.delivery.settle() |
| # otherwise just drop it |
| |
| def run(self): |
| Container(self).run() |
| |
| def verify_released(self, large_message=False): |
| """ |
| Verify the link released count by releasing all received messages |
| """ |
| test = self.LinkCountersTest(self.router.addresses[0], |
| outcome=Delivery.RELEASED, |
| large_message=large_message) |
| test.run() |
| self.assertEqual(test.receiver_stats['deliveryCount'], self.COUNT) |
| self.assertEqual(test.receiver_stats['releasedCount'], self.COUNT) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS |
| - set(['deliveryCount', |
| 'releasedCount']))) |
| |
| self.assertEqual(test.sender_stats['deliveryCount'], self.COUNT) |
| self.assertEqual(test.sender_stats['releasedCount'], self.COUNT) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS |
| - set(['deliveryCount', |
| 'releasedCount']))) |
| |
| def verify_unsettled_count(self, large_message=False): |
| """ |
| Verify the link unsettled count by granting less credit than required |
| by the sender |
| """ |
| test = self.LinkCountersTest(self.router.addresses[0], |
| presettled=False, |
| count=self.COUNT, |
| rx_limit=self.CREDIT, |
| credit=self.CREDIT, |
| large_message=large_message) |
| test.run() |
| |
| # expect the receiver to get rx_limit worth of unsettled deliveries |
| self.assertEqual(test.receiver_stats['deliveryCount'], self.CREDIT) |
| self.assertEqual(test.receiver_stats['unsettledCount'], self.CREDIT) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS |
| - set(['deliveryCount', |
| 'unsettledCount']))) |
| |
| # expect sender only to be able to send as much as credit |
| self.assertEqual(test.sender_stats['deliveryCount'], self.CREDIT) |
| self.assertEqual(test.sender_stats['unsettledCount'], self.CREDIT) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS |
| - set(['deliveryCount', |
| 'unsettledCount']))) |
| |
| |
| def verify_presettled_count(self, large_message=False): |
| """ |
| Verify the presettled dropped count link counter by exhausting credit |
| before sending is complete |
| """ |
| limit = self.CREDIT//2 # 1/2 the capacity given the sender |
| test = self.LinkCountersTest(self.router.addresses[0], |
| presettled=True, |
| count=self.COUNT, |
| rx_limit=limit, |
| credit=limit, |
| large_message=large_message) |
| test.run() |
| |
| # since these are presettled the sender should have credit |
| # replenished by the router after each message. |
| self.assertEqual(test.sender_stats['deliveryCount'], self.COUNT) |
| self.assertEqual(test.sender_stats['presettledCount'], self.COUNT) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS |
| - set(['deliveryCount', |
| 'presettledCount']))) |
| |
| # since credit is fixed at limit, exactly that number of msgs can be received |
| self.assertEqual(test.receiver_stats['deliveryCount'], limit) |
| |
| # verify that some messages were dropped and some are stuck on the |
| # undelivered list |
| self.assertTrue(test.receiver_stats['undeliveredCount'] > 0) |
| self.assertTrue(test.receiver_stats['droppedPresettledCount'] > 0) |
| |
| # expect that whatever was not dropped was delivered |
| self.assertEqual(test.receiver_stats['deliveryCount'], |
| (test.receiver_stats['presettledCount'] |
| - test.receiver_stats['droppedPresettledCount'])) |
| |
| # expect the sum of dropped+delivered+undelivered accounts for all |
| # messages sent |
| self.assertEqual(self.COUNT, |
| (test.receiver_stats['deliveryCount'] |
| + test.receiver_stats['undeliveredCount'] |
| + test.receiver_stats['droppedPresettledCount'])) |
| |
| # all other counters must be zero |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS |
| - set(['deliveryCount', |
| 'undeliveredCount', |
| 'droppedPresettledCount', |
| 'presettledCount']))) |
| |
| def verify_one_credit_accepted(self, large_message=False): |
| """ |
| Verify counters on a credit-blocked link |
| """ |
| test = self.LinkCountersTest(self.router.addresses[0], |
| outcome=Delivery.ACCEPTED, |
| rx_limit=1, |
| credit=1) |
| test.run() |
| # expect only 1 delivery, an link credit worth of queued up messages |
| self.assertEqual(test.receiver_stats['deliveryCount'], 1) |
| self.assertEqual(test.receiver_stats['acceptedCount'], 1) |
| self.assertEqual(test.receiver_stats['undeliveredCount'], self.CREDIT) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS |
| - set(['deliveryCount', |
| 'undeliveredCount', |
| 'acceptedCount']))) |
| |
| # expect that one message will be delivered, then link capacity |
| # messages will be enqueued internally |
| self.assertEqual(test.sender_stats['unsettledCount'], self.CREDIT) |
| self.assertEqual(test.sender_stats['deliveryCount'], self.CREDIT + 1) |
| self.assertEqual(test.sender_stats['acceptedCount'], 1) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS |
| - set(['deliveryCount', |
| 'unsettledCount', |
| 'acceptedCount']))) |
| |
| def test_01_presettled(self): |
| self.verify_presettled_count() |
| |
| def test_02_large_mesage_presettled(self): |
| self.verify_presettled_count(True) |
| |
| def test_03_unsettled(self): |
| self.verify_presettled_count() |
| |
| def test_04_large_message_unsettled(self): |
| self.verify_presettled_count(True) |
| |
| def test_05_released(self): |
| self.verify_released() |
| |
| def test_06_large_message_released(self): |
| self.verify_released(True) |
| |
| def test_07_one_credit_accepted(self): |
| self.verify_one_credit_accepted() |
| |
| def test_08_large_message_one_credit_accepted(self): |
| self.verify_one_credit_accepted(True) |
| |
| |
| |
| class RouteContainerIngressCount(TestCase): |
| @classmethod |
| def setUpClass(cls): |
| super(RouteContainerIngressCount, cls).setUpClass() |
| |
| config = Qdrouterd.Config([ |
| ('router', {'mode': 'standalone', 'id': 'QDR'}), |
| |
| # |
| # Create a general-purpose listener for sending and receiving deliveries |
| # |
| ('listener', {'port': cls.tester.get_port()}), |
| |
| # Create a route-container listener and give it a name myListener. |
| # Later on we will create an autoLink which has a connection property of myListener. |
| # |
| ('listener', {'role': 'route-container', 'name': 'myListener', 'port': cls.tester.get_port()}), |
| # |
| # Note here that the connection is set to a previously declared 'myListener' |
| # |
| ('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'in'}), |
| ('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'out'}), |
| ]) |
| |
| cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True) |
| |
| def test_route_container_ingress(self): |
| regular_addr = self.router.addresses[0] |
| route_container_addr = self.router.addresses[1] |
| test = RouteContainerIngressTest(route_container_addr, regular_addr) |
| test.run() |
| |
| local_node = Node.connect(regular_addr, timeout=TIMEOUT) |
| outs = local_node.query(type='org.apache.qpid.dispatch.router') |
| |
| deliveries_ingress_route_container_index = outs.attribute_names.index('deliveriesIngressRouteContainer') |
| |
| results = outs.results[0] |
| # 22 = 20 msgs + 2 mgmt msgs |
| self.assertEqual(results[deliveries_ingress_route_container_index], 22) |
| |
| # check link statistics |
| self.assertEqual(test.sender_stats['deliveryCount'], 10) |
| self.assertEqual(test.sender_stats['acceptedCount'], 10) |
| self.assertTrue(_link_stats_are_zero(test.sender_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| self.assertEqual(test.sender1_stats['deliveryCount'], 10) |
| self.assertEqual(test.sender1_stats['acceptedCount'], 10) |
| self.assertTrue(_link_stats_are_zero(test.sender1_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| self.assertEqual(test.receiver_stats['deliveryCount'], 20) |
| self.assertEqual(test.receiver_stats['acceptedCount'], 20) |
| self.assertTrue(_link_stats_are_zero(test.receiver_stats, |
| _LINK_STATISTIC_KEYS - set(['deliveryCount', |
| 'acceptedCount']))) |
| |
| |
| class IngressEgressTwoRouterTest(MessagingHandler): |
| def __init__(self, sender_address, receiver_address, num_messages, large_message=False): |
| super(IngressEgressTwoRouterTest, self).__init__() |
| self.sender = None |
| self.receiver = None |
| self.conn_sender = None |
| self.conn_recv = None |
| self.timer = None |
| self.dest = 'examples' |
| self.sender_address = sender_address |
| self.receiver_address = receiver_address |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.num_messages = num_messages |
| self.start = False |
| self.n_accept = 0 |
| self.sender_stats = None |
| self.receiver_stats = None |
| self.done = False |
| self.large_message = large_message |
| |
| def timeout(self): |
| self.conn_sender.close() |
| self.conn_recv.close() |
| |
| def check_if_done(self): |
| if not self.done and self.num_messages == self.n_received and self.n_accept == self.num_messages: |
| self.done = True |
| self.sender_stats = get_link_info('Tx_IngressEgressTwoRouterTest', |
| self.sender_address) |
| self.receiver_stats = get_link_info('Rx_IngressEgressTwoRouterTest', |
| self.receiver_address) |
| self.conn_sender.close() |
| self.conn_recv.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn_recv = event.container.connect(self.receiver_address) |
| self.receiver = event.container.create_receiver(self.conn_recv, |
| source=self.dest, |
| name='Rx_IngressEgressTwoRouterTest') |
| |
| def on_sendable(self, event): |
| if not self.start: |
| return |
| |
| if self.n_sent < self.num_messages: |
| msg = Message(body=get_body(self.n_sent, self.large_message)) |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.start = True |
| self.conn_sender = event.container.connect(self.sender_address) |
| self.sender = event.container.create_sender(self.conn_sender, |
| target=self.dest, |
| name='Tx_IngressEgressTwoRouterTest') |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_received += 1 |
| |
| def on_accepted(self, event): |
| if event.sender: |
| self.n_accept += 1 |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class IngressEgressOneRouterTest(MessagingHandler): |
| def __init__(self, address, num_messages, large_message=False): |
| super(IngressEgressOneRouterTest, self).__init__() |
| self.sender = None |
| self.receiver = None |
| self.conn = None |
| self.timer = None |
| self.dest = 'examples' |
| self.address = address |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.n_accepted = 0 |
| self.num_messages = num_messages |
| self.sender_stats = None |
| self.receiver_stats = None |
| self.done = False |
| self.large_message = large_message |
| |
| def timeout(self): |
| self.conn.close() |
| |
| def check_if_done(self): |
| if not self.done and (self.n_sent == self.n_received |
| and self.n_sent == self.n_accepted): |
| self.done = True |
| self.sender_stats = get_link_info('Tx_IngressEgressOneRouterTest', self.address) |
| self.receiver_stats = get_link_info('Rx_IngressEgressOneRouterTest', self.address) |
| self.conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.conn = event.container.connect(self.address) |
| self.sender = event.container.create_sender(self.conn, |
| target=self.dest, |
| name='Tx_IngressEgressOneRouterTest') |
| self.receiver = event.container.create_receiver(self.conn, |
| source=self.dest, |
| name='Rx_IngressEgressOneRouterTest') |
| |
| def on_sendable(self, event): |
| if self.n_sent < self.num_messages: |
| msg = Message(body=get_body(self.n_sent, self.large_message)) |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_received += 1 |
| |
| def on_accepted(self, event): |
| self.n_accepted += 1 |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class RouteContainerEgressTest(MessagingHandler): |
| def __init__(self, route_container_addr, sender_addr, num_messages, large_message=False): |
| super(RouteContainerEgressTest, self).__init__() |
| self.sender_addr = sender_addr |
| self.route_container_addr = route_container_addr |
| self.timer = None |
| self.error = None |
| self.receiver = None |
| self.receiver_conn = None |
| self.dest = "myListener.1" |
| self.sender_conn = None |
| self.sender = None |
| self.start = False |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.n_accepted = 0 |
| self.num_messages = num_messages |
| self.sender_stats = None |
| self.receiver_stats = None |
| self.done = False |
| |
| def check_if_done(self): |
| if not self.done and (self.n_sent == self.n_received |
| and self.n_sent == self.n_accepted): |
| self.done = True |
| self.sender_stats = get_link_info('Tx_RouteContainerEgressTest', self.sender_addr) |
| self.receiver_stats = get_link_info('Rx_RouteContainerEgressTest', self.route_container_addr) |
| self.receiver_conn.close() |
| self.sender_conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.receiver_conn = event.container.connect(self.route_container_addr) |
| self.receiver = event.container.create_receiver(self.receiver_conn, |
| source=self.dest, |
| name='Rx_RouteContainerEgressTest') |
| |
| def timeout(self): |
| self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def on_sendable(self, event): |
| if not self.start: |
| return |
| |
| if self.n_sent < self.num_messages: |
| msg = Message(body={'number': self.n_sent}) |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_received += 1 |
| |
| def on_accepted(self, event): |
| self.n_accepted += 1 |
| self.check_if_done() |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.start = True |
| self.sender_conn = event.container.connect(self.sender_addr) |
| self.sender = event.container.create_sender(self.sender_conn, |
| target=self.dest, |
| name='Tx_RouteContainerEgressTest') |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class RouteContainerIngressTest(MessagingHandler): |
| def __init__(self, route_container_addr, receiver_addr): |
| super(RouteContainerIngressTest, self).__init__() |
| self.receiver_addr = receiver_addr |
| self.route_container_addr = route_container_addr |
| self.timer = None |
| self.error = None |
| self.receiver = None |
| self.receiver_conn = None |
| self.dest = "myListener.1" |
| self.sender_conn = None |
| self.sender = None |
| self.sender1 = None |
| self.start = False |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.n_accepted = 0 |
| self.num_messages = 20 |
| self.sender_stats = None |
| self.sender1_stats = None |
| self.receiver_stats = None |
| self.done = False |
| |
| def check_if_done(self): |
| if not self.done and (self.n_sent == self.n_received |
| and self.n_sent == self.n_accepted): |
| self.done = True |
| self.sender_stats = get_link_info('A', self.route_container_addr) |
| self.sender1_stats = get_link_info('B', self.route_container_addr) |
| self.receiver_stats = get_link_info('Rx_RouteContainerIngressTest', self.receiver_addr) |
| self.receiver_conn.close() |
| self.sender_conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.receiver_conn = event.container.connect(self.receiver_addr) |
| self.receiver = event.container.create_receiver(self.receiver_conn, |
| source=self.dest, |
| name='Rx_RouteContainerIngressTest') |
| self.sender_conn = event.container.connect(self.route_container_addr) |
| |
| def timeout(self): |
| self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def on_sendable(self, event): |
| if not self.start: |
| return |
| |
| if self.n_sent < self.num_messages: |
| msg = Message(body={'number': self.n_sent}) |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| self.sender1.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_received += 1 |
| |
| def on_accepted(self, event): |
| self.n_accepted += 1 |
| self.check_if_done() |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.start = True |
| # Create 2 senders. Each sender will send 10 messages each |
| self.sender = event.container.create_sender(self.sender_conn, self.dest, name="A") |
| self.sender1 = event.container.create_sender(self.sender_conn, self.dest, name="B") |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class IngressEgressTransitLinkRouteTest(MessagingHandler): |
| def __init__(self, sender_addr, receiver_addr, num_messages, large_message=False): |
| super(IngressEgressTransitLinkRouteTest, self).__init__() |
| self.timer = None |
| self.receiver_conn = None |
| self.receiver = None |
| self.sender = None |
| self.sender_conn = None |
| self.dest = "pulp.task" |
| self.start = False |
| self.n_sent = 0 |
| self.num_messages = num_messages |
| self.n_received = 0 |
| self.n_accepted = 0 |
| self.sender_addr = sender_addr |
| self.receiver_addr = receiver_addr |
| self.error = None |
| self.sender_stats = None |
| self.receiver_stats = None |
| self.done = False |
| self.large_message = large_message |
| |
| def timeout(self): |
| self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def check_if_done(self): |
| if not self.done and (self.n_sent == self.n_received |
| and self.n_sent == self.n_accepted): |
| self.done = True |
| self.sender_stats = get_link_info('Tx_IngressEgressTransitLinkRouteTest', |
| self.sender_addr) |
| self.receiver_stats = get_link_info('Rx_IngressEgressTransitLinkRouteTest', |
| self.receiver_addr) |
| self.receiver_conn.close() |
| self.sender_conn.close() |
| self.timer.cancel() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.receiver_conn = event.container.connect(self.receiver_addr) |
| self.receiver = event.container.create_receiver(self.receiver_conn, |
| source=self.dest, |
| name='Rx_IngressEgressTransitLinkRouteTest') |
| |
| def on_link_opened(self, event): |
| if event.receiver == self.receiver: |
| self.sender_conn = event.container.connect(self.sender_addr) |
| self.sender = event.container.create_sender(self.sender_conn, |
| target=self.dest, |
| name='Tx_IngressEgressTransitLinkRouteTest') |
| self.start = True |
| |
| def on_sendable(self, event): |
| if not self.start: |
| return |
| |
| if self.n_sent < self.num_messages: |
| msg = Message(body=get_body(self.n_sent, self.large_message)) |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| if event.receiver == self.receiver: |
| self.n_received += 1 |
| |
| def on_accepted(self, event): |
| self.n_accepted += 1 |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class ReleasedDroppedPresettledCountTest(MessagingHandler): |
| def __init__(self, sender_addr, num_messages, large_message=False): |
| super(ReleasedDroppedPresettledCountTest, self).__init__() |
| self.timer = None |
| self.sender_conn = None |
| self.sender = None |
| self.error = None |
| self.n_sent = 0 |
| self.num_messages = num_messages |
| self.sender_addr = sender_addr |
| self.sender_stats = None |
| |
| # We are sending to a multicast address |
| self.dest = "multicast" |
| self.n_released = 0 |
| self.expect_released = 10 |
| self.done = False |
| self.large_message = large_message |
| |
| def check_if_done(self): |
| if not self.done and self.expect_released == self.n_released: |
| self.done = True |
| self.sender_stats = get_link_info('ReleasedDroppedPresettledCountTest', |
| self.sender_addr) |
| self.sender_conn.close() |
| self.timer.cancel() |
| |
| def timeout(self): |
| self.error = "Timeout Expired: self.n_sent=%d, self.self.n_released=%d " % (self.n_sent, self.n_released) |
| self.sender_conn.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.sender_conn = event.container.connect(self.sender_addr) |
| |
| # Note that this is an anonymous link which will be granted credit w/o |
| # blocking for consumers. Therefore all messages sent to this address |
| # will be dropped |
| self.sender = event.container.create_sender(self.sender_conn, |
| name='ReleasedDroppedPresettledCountTest') |
| def on_sendable(self, event): |
| # We are sending a total of 20 deliveries. 10 unsettled and 10 pre-settled to a multicast address |
| if self.n_sent < self.num_messages: |
| msg = Message(body=get_body(self.n_sent, self.large_message)) |
| msg.address = self.dest |
| dlv = self.sender.send(msg) |
| if self.n_sent < 10: |
| dlv.settle() |
| self.n_sent += 1 |
| |
| def on_released(self, event): |
| self.n_released += 1 |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class RejectedDeliveriesTest(MessagingHandler): |
| def __init__(self, addr, num_messages, large_message=False): |
| super(RejectedDeliveriesTest, self).__init__(auto_accept=False) |
| self.addr = addr |
| self.dest = "someaddress" |
| self.error = None |
| self.n_sent = 0 |
| self.num_messages = num_messages |
| self.n_rejected = 0 |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.timer = None |
| self.sender = None |
| self.receiver = None |
| self.sender_stats = None |
| self.receiver_stats = None |
| self.done = False |
| self.large_message = large_message |
| |
| def check_if_done(self): |
| if not self.done and self.n_rejected == self.num_messages: |
| self.done = True |
| self.sender_stats = get_link_info('Tx_RejectedDeliveriesTest', self.addr) |
| self.receiver_stats = get_link_info('Rx_RejectedDeliveriesTest', self.addr) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.timer.cancel() |
| |
| def timeout(self): |
| self.error = "Timeout Expired: self.n_sent=%d, self.self.n_rejected=%d " % (self.n_sent, self.n_rejected) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.sender_conn = event.container.connect(self.addr) |
| self.sender = event.container.create_sender(self.sender_conn, |
| target=self.dest, |
| name='Tx_RejectedDeliveriesTest') |
| self.receiver_conn = event.container.connect(self.addr) |
| self.receiver = event.container.create_receiver(self.receiver_conn, |
| source=self.dest, |
| name='Rx_RejectedDeliveriesTest') |
| |
| def on_rejected(self, event): |
| self.n_rejected += 1 |
| self.check_if_done() |
| |
| def on_message(self, event): |
| # We will reject every delivery we receive. |
| self.reject(event.delivery) |
| |
| def on_sendable(self, event): |
| if self.n_sent < self.num_messages: |
| msg = Message(body=get_body(self.n_sent, self.large_message)) |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class ModifiedDeliveriesTest(MessagingHandler): |
| def __init__(self, addr, num_messages, large_message=False): |
| super(ModifiedDeliveriesTest, self).__init__(auto_accept=False) |
| self.addr = addr |
| self.dest = "someaddress" |
| self.error = None |
| self.n_sent = 0 |
| self.num_messages = num_messages |
| self.n_modified = 0 |
| self.sender_conn = None |
| self.receiver_conn = None |
| self.timer = None |
| self.sender = None |
| self.receiver = None |
| self.n_received = 0 |
| self.sender_stats = None |
| self.receiver_stats = None |
| self.done = False |
| self.large_message = large_message |
| |
| def check_if_done(self): |
| if not self.done and self.n_modified == self.num_messages: |
| self.done = True |
| self.sender_stats = get_link_info('Tx_ModifiedDeliveriesTest', self.addr) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| self.timer.cancel() |
| |
| def timeout(self): |
| self.error = "Timeout Expired: self.n_sent=%d, self.self.n_modified=%d " % (self.n_sent, self.n_modified) |
| self.sender_conn.close() |
| self.receiver_conn.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) |
| self.sender_conn = event.container.connect(self.addr) |
| self.sender = event.container.create_sender(self.sender_conn, |
| target=self.dest, |
| name='Tx_ModifiedDeliveriesTest') |
| self.receiver_conn = event.container.connect(self.addr) |
| self.receiver = event.container.create_receiver(self.receiver_conn, |
| source=self.dest, |
| name='Rx_ModifiedDeliveriesTest') |
| |
| def on_released(self, event): |
| if event.delivery.remote_state == Delivery.MODIFIED: |
| self.n_modified += 1 |
| self.check_if_done() |
| |
| def on_message(self, event): |
| # The messages have arrived at the receiver but we will not settle the message and instead just closed the |
| # connection. Since the router did not receive the acknowledgements, it will send back MODIFIED dispositions |
| # to the sender. |
| self.n_received += 1 |
| # After 10 messages are received, simply close the receiver connection without acknowledging the messages |
| if self.n_received == self.num_messages: |
| self.receiver_stats = get_link_info('Rx_ModifiedDeliveriesTest', self.addr) |
| self.receiver_conn.close() |
| |
| def on_sendable(self, event): |
| if self.n_sent < self.num_messages: |
| msg = Message(body=get_body(self.n_sent, self.large_message)) |
| self.sender.send(msg) |
| self.n_sent += 1 |
| |
| def run(self): |
| Container(self).run() |