| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| # |
| # Test the multicast forwarder |
| # |
| |
| from __future__ import unicode_literals |
| from __future__ import division |
| from __future__ import absolute_import |
| from __future__ import print_function |
| |
| import sys |
| from time import sleep |
| |
| from proton.handlers import MessagingHandler |
| from proton.reactor import Container |
| from proton.reactor import LinkOption |
| from proton import Connection |
| from proton import Link |
| from proton import Message |
| from proton import Delivery |
| from qpid_dispatch.management.client import Node |
| from system_test import AsyncTestSender |
| from system_test import AsyncTestReceiver |
| from system_test import TestCase |
| from system_test import Qdrouterd |
| from system_test import main_module |
| from system_test import TIMEOUT |
| from system_test import TestTimeout |
| from system_test import unittest |
| |
| |
| MAX_FRAME=1023 |
| LINK_CAPACITY=250 |
| W_THREADS=2 |
| LARGE_PAYLOAD = ("X" * MAX_FRAME) * 19 |
| |
| # check for leaks of the following entities |
| ALLOC_STATS=["qd_message_t", |
| "qd_buffer_t", |
| "qdr_delivery_t"] |
| |
| |
| class MulticastLinearTest(TestCase): |
| """ |
| Verify the multicast forwarding logic across a multihop linear router |
| configuration |
| """ |
| @classmethod |
| def setUpClass(cls): |
| """Start a router""" |
| super(MulticastLinearTest, cls).setUpClass() |
| |
| def router(name, mode, extra): |
| config = [ |
| ('router', {'mode': mode, |
| 'id': name, |
| 'allowUnsettledMulticast': 'yes', |
| 'workerThreads': W_THREADS}), |
| ('listener', {'role': 'normal', |
| 'port': cls.tester.get_port(), |
| 'maxFrameSize': MAX_FRAME, |
| 'linkCapacity': LINK_CAPACITY}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| ] |
| |
| if extra: |
| config.extend(extra) |
| config = Qdrouterd.Config(config) |
| cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) |
| return cls.routers[-1] |
| |
| # configuration: |
| # two edge routers connected via 2 interior routers. |
| # |
| # +-------+ +---------+ +---------+ +-------+ |
| # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | |
| # +-------+ +---------+ +---------+ +-------+ |
| # |
| # Each router has 2 multicast consumers |
| # EA1 and INT.A each have a multicast sender |
| |
| cls.routers = [] |
| |
| interrouter_port = cls.tester.get_port() |
| cls.INTA_edge_port = cls.tester.get_port() |
| cls.INTB_edge_port = cls.tester.get_port() |
| |
| router('INT.A', 'interior', |
| [('listener', {'role': 'inter-router', |
| 'port': interrouter_port}), |
| ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})]) |
| cls.INT_A = cls.routers[0] |
| cls.INT_A.listener = cls.INT_A.addresses[0] |
| |
| router('INT.B', 'interior', |
| [('connector', {'name': 'connectorToA', |
| 'role': 'inter-router', |
| 'port': interrouter_port}), |
| ('listener', {'role': 'edge', |
| 'port': cls.INTB_edge_port})]) |
| |
| cls.INT_B = cls.routers[1] |
| cls.INT_B.listener = cls.INT_B.addresses[0] |
| |
| router('EA1', 'edge', |
| [('listener', {'name': 'rc', 'role': 'route-container', |
| 'port': cls.tester.get_port()}), |
| ('connector', {'name': 'uplink', 'role': 'edge', |
| 'port': cls.INTA_edge_port}), |
| ('linkRoute', {'prefix': 'CfgLinkRoute1', 'containerId': 'FakeBroker', 'direction': 'in'}), |
| ('linkRoute', {'prefix': 'CfgLinkRoute1', 'containerId': 'FakeBroker', 'direction': 'out'})]) |
| cls.EA1 = cls.routers[2] |
| cls.EA1.listener = cls.EA1.addresses[0] |
| cls.EA1.route_container = cls.EA1.addresses[1] |
| |
| router('EB1', 'edge', |
| [('connector', {'name': 'uplink', |
| 'role': 'edge', |
| 'port': cls.INTB_edge_port, |
| 'maxFrameSize': 1024}), |
| ('listener', {'name': 'rc', 'role': 'route-container', |
| 'port': cls.tester.get_port()}), |
| ('linkRoute', {'pattern': '*.cfg.pattern.#', 'containerId': 'FakeBroker', 'direction': 'in'}), |
| ('linkRoute', {'pattern': '*.cfg.pattern.#', 'containerId': 'FakeBroker', 'direction': 'out'})]) |
| cls.EB1 = cls.routers[3] |
| cls.EB1.listener = cls.EB1.addresses[0] |
| cls.EB1.route_container = cls.EB1.addresses[1] |
| |
| cls.INT_A.wait_router_connected('INT.B') |
| cls.INT_B.wait_router_connected('INT.A') |
| cls.EA1.wait_connectors() |
| cls.EB1.wait_connectors() |
| |
| # Client topology: |
| # all routes have 2 receivers |
| # Edge router EA1 and interior INT_A have a sender each |
| # |
| cls.config = [ |
| # edge router EA1: |
| {'router': cls.EA1, |
| 'senders': ['S-EA1-1'], |
| 'receivers': ['R-EA1-1', 'R-EA1-2'], |
| 'subscribers': 2, |
| 'remotes': 0 |
| }, |
| # Interior router INT_A: |
| {'router': cls.INT_A, |
| 'senders': ['S-INT_A-1'], |
| 'receivers': ['R-INT_A-1', 'R-INT_A-2'], |
| 'subscribers': 3, |
| 'remotes': 1, |
| }, |
| # Interior router INT_B: |
| {'router': cls.INT_B, |
| 'senders': [], |
| 'receivers': ['R-INT_B-1', 'R-INT_B-2'], |
| 'subscribers': 3, |
| 'remotes': 1, |
| }, |
| # edge router EB1 |
| {'router': cls.EB1, |
| 'senders': [], |
| 'receivers': ['R-EB1-1', 'R-EB1-2'], |
| 'subscribers': 2, |
| 'remotes': 0, |
| } |
| ] |
| |
| def _get_alloc_stats(self, router, stats): |
| # return a map of the current allocator counters for each entity type |
| # name in stats |
| |
| # |
| # 57: END = [{u'heldByThreads': int32(384), u'typeSize': int32(536), |
| # u'transferBatchSize': int32(64), u'globalFreeListMax': int32(0), |
| # u'batchesRebalancedToGlobal': int32(774), u'typeName': |
| # u'qd_buffer_t', u'batchesRebalancedToThreads': int32(736), |
| # u'totalFreeToHeap': int32(0), u'totalAllocFromHeap': int32(2816), |
| # u'localFreeListMax': int32(128), u'type': |
| # u'org.apache.qpid.dispatch.allocator', u'identity': |
| # u'allocator/qd_buffer_t', u'name': u'allocator/qd_buffer_t'}] |
| |
| d = dict() |
| mgmt = router.management |
| atype = 'org.apache.qpid.dispatch.allocator' |
| q = mgmt.query(type=atype).get_dicts() |
| for name in stats: |
| d[name] = list(filter(lambda a: a['typeName'] == name, q))[0] |
| return d |
| |
| def _check_for_leaks(self): |
| for r in self.routers: |
| stats = self._get_alloc_stats(r, ALLOC_STATS) |
| for name in ALLOC_STATS: |
| # ensure threads haven't leaked |
| max_allowed = ((W_THREADS + 1) |
| * stats[name]['localFreeListMax']) |
| held = stats[name]['heldByThreads'] |
| if held >= (2 * max_allowed): |
| print("OOPS!!! %s: (%s) - held=%d max=%d\n %s\n" |
| % (r.config.router_id, |
| name, held, max_allowed, stats)) |
| sys.stdout.flush() |
| self.assertFalse(held >= (2 * max_allowed)) |
| |
| # |
| # run all the negative tests first so that if we screw up the internal |
| # state of the brokers the positive tests will likely fail |
| # |
| |
| def _presettled_large_msg_rx_detach(self, config, count, drop_clients): |
| # detach receivers during receive |
| body = " MCAST PRESETTLED LARGE RX DETACH " + LARGE_PAYLOAD |
| test = MulticastPresettledRxFail(config, count, |
| drop_clients, |
| detach=True, |
| body=body) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_01_presettled_large_msg_rx_detach(self): |
| self._presettled_large_msg_rx_detach(self.config, 10, ['R-EA1-1', 'R-EB1-2']) |
| self._presettled_large_msg_rx_detach(self.config, 10, ['R-INT_A-2', 'R-INT_B-1']) |
| |
| def _presettled_large_msg_rx_close(self, config, count, drop_clients): |
| # close receiver connections during receive |
| body = " MCAST PRESETTLED LARGE RX CLOSE " + LARGE_PAYLOAD |
| test = MulticastPresettledRxFail(config, count, |
| drop_clients, |
| detach=False, |
| body=body) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_02_presettled_large_msg_rx_close(self): |
| self._presettled_large_msg_rx_close(self.config, 10, ['R-EA1-2', 'R-EB1-1']) |
| self._presettled_large_msg_rx_close(self.config, 10, ['R-INT_A-1', 'R-INT_B-2']) |
| |
| def _unsettled_large_msg_rx_detach(self, config, count, drop_clients): |
| # detach receivers during the test |
| body = " MCAST UNSETTLED LARGE RX DETACH " + LARGE_PAYLOAD |
| test = MulticastUnsettledRxFail(self.config, count, drop_clients, detach=True, body=body) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_10_unsettled_large_msg_rx_detach(self): |
| self._unsettled_large_msg_rx_detach(self.config, 10, ['R-EA1-1', 'R-EB1-2']) |
| self._unsettled_large_msg_rx_detach(self.config, 10, ['R-INT_A-2', 'R-INT_B-1']) |
| |
| def _unsettled_large_msg_rx_close(self, config, count, drop_clients): |
| # close receiver connections during test |
| body = " MCAST UNSETTLED LARGE RX CLOSE " + LARGE_PAYLOAD |
| test = MulticastUnsettledRxFail(self.config, count, drop_clients, detach=False, body=body) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_11_unsettled_large_msg_rx_close(self): |
| self._unsettled_large_msg_rx_close(self.config, 10, ['R-EA1-2', 'R-EB1-1', ]) |
| self._unsettled_large_msg_rx_close(self.config, 10, ['R-INT_A-1', 'R-INT_B-2']) |
| |
| # |
| # now the positive tests |
| # |
| |
| def test_50_presettled(self): |
| # Simply send a bunch of pre-settled multicast messages |
| body = " MCAST PRESETTLED " |
| test = MulticastPresettled(self.config, 10, body, SendPresettled()) |
| test.run() |
| |
| def test_51_presettled_mixed_large_msg(self): |
| # Same as above, but large message bodies (mixed sender settle mode) |
| body = " MCAST MAYBE PRESETTLED LARGE " + LARGE_PAYLOAD |
| test = MulticastPresettled(self.config, 11, body, SendMixed()) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_52_presettled_large_msg(self): |
| # Same as above, (pre-settled sender settle mode) |
| body = " MCAST PRESETTLED LARGE " + LARGE_PAYLOAD |
| test = MulticastPresettled(self.config, 13, body, SendPresettled()) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_60_unsettled_3ack(self): |
| # Sender sends unsettled, waits for Outcome from Receiver then settles |
| # Expect all messages to be accepted |
| body = " MCAST UNSETTLED " |
| test = MulticastUnsettled3Ack(self.config, 10, body) |
| test.run() |
| self.assertEqual(None, test.error) |
| self.assertEqual(test.n_outcomes[Delivery.ACCEPTED], test.n_sent) |
| |
| def test_61_unsettled_3ack_large_msg(self): |
| # Same as above but with multiframe streaming |
| body = " MCAST UNSETTLED LARGE " + LARGE_PAYLOAD |
| test = MulticastUnsettled3Ack(self.config, 11, body=body) |
| test.run() |
| self.assertEqual(None, test.error) |
| self.assertEqual(test.n_outcomes[Delivery.ACCEPTED], test.n_sent) |
| |
| def _unsettled_3ack_outcomes(self, |
| config, |
| count, |
| outcomes, |
| expected): |
| body = " MCAST UNSETTLED 3ACK OUTCOMES " + LARGE_PAYLOAD |
| test = MulticastUnsettled3Ack(self.config, |
| count, |
| body, |
| outcomes=outcomes) |
| test.run() |
| self.assertEqual(None, test.error) |
| self.assertEqual(test.n_outcomes[expected], test.n_sent) |
| |
| def test_63_unsettled_3ack_outcomes(self): |
| # Verify the expected outcome is returned to the sender when the |
| # receivers return different outcome values. If no outcome is |
| # specified for a receiver it will default to ACCEPTED |
| |
| # expect REJECTED if any reject: |
| self._unsettled_3ack_outcomes(self.config, 3, |
| {'R-EB1-1': Delivery.REJECTED, |
| 'R-EB1-2': Delivery.MODIFIED, |
| 'R-INT_B-2': Delivery.RELEASED}, |
| Delivery.REJECTED) |
| self._unsettled_3ack_outcomes(self.config, 3, |
| {'R-EB1-1': Delivery.REJECTED, |
| 'R-INT_B-2': Delivery.RELEASED}, |
| Delivery.REJECTED) |
| # expect ACCEPT if no rejects |
| self._unsettled_3ack_outcomes(self.config, 3, |
| {'R-EB1-2': Delivery.MODIFIED, |
| 'R-INT_B-2': Delivery.RELEASED}, |
| Delivery.ACCEPTED) |
| # expect MODIFIED over RELEASED |
| self._unsettled_3ack_outcomes(self.config, 3, |
| {'R-EA1-1': Delivery.RELEASED, |
| 'R-EA1-2': Delivery.RELEASED, |
| 'R-INT_A-1': Delivery.RELEASED, |
| 'R-INT_A-2': Delivery.RELEASED, |
| 'R-INT_B-1': Delivery.RELEASED, |
| 'R-INT_B-2': Delivery.RELEASED, |
| 'R-EB1-1': Delivery.RELEASED, |
| 'R-EB1-2': Delivery.MODIFIED}, |
| Delivery.MODIFIED) |
| |
| # and released only if all released |
| self._unsettled_3ack_outcomes(self.config, 3, |
| {'R-EA1-1': Delivery.RELEASED, |
| 'R-EA1-2': Delivery.RELEASED, |
| 'R-INT_A-1': Delivery.RELEASED, |
| 'R-INT_A-2': Delivery.RELEASED, |
| 'R-INT_B-1': Delivery.RELEASED, |
| 'R-INT_B-2': Delivery.RELEASED, |
| 'R-EB1-1': Delivery.RELEASED, |
| 'R-EB1-2': Delivery.RELEASED}, |
| Delivery.RELEASED) |
| |
| def test_70_unsettled_1ack(self): |
| # Sender sends unsettled, expects both outcome and settlement from |
| # receiver before sender settles locally |
| body = " MCAST UNSETTLED 1ACK " |
| test = MulticastUnsettled1Ack(self.config, 10, body) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_71_unsettled_1ack_large_msg(self): |
| # Same as above but with multiframe streaming |
| body = " MCAST UNSETTLED 1ACK LARGE " + LARGE_PAYLOAD |
| test = MulticastUnsettled1Ack(self.config, 10, body) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_80_unsettled_3ack_message_annotations(self): |
| body = " MCAST UNSETTLED 3ACK LARGE MESSAGE ANNOTATIONS " + LARGE_PAYLOAD |
| test = MulticastUnsettled3AckMA(self.config, 10, body) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_90_credit_no_subscribers(self): |
| """ |
| Verify that multicast senders are blocked until a consumer is present. |
| """ |
| test = MulticastCreditBlocked(address=self.EA1.listener, |
| target='multicast/no/subscriber1') |
| |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| test = MulticastCreditBlocked(address=self.INT_A.listener, |
| target='multicast/no/subscriber2') |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_91_anonymous_sender(self): |
| """ |
| Verify that senders over anonymous links do not block waiting for |
| consumers. |
| """ |
| |
| # no receiver - should not block, return RELEASED |
| msg = Message(body="test_100_anonymous_sender") |
| msg.address = "multicast/test_100_anonymous_sender" |
| tx = AsyncTestSender(address=self.INT_B.listener, |
| count=5, |
| target=None, |
| message=msg, |
| container_id="test_100_anonymous_sender") |
| tx.wait() |
| self.assertEqual(5, tx.released) |
| |
| # now add a receiver: |
| rx = AsyncTestReceiver(address=self.INT_A.listener, |
| source=msg.address) |
| self.INT_B.wait_address(msg.address) |
| tx = AsyncTestSender(address=self.INT_B.listener, |
| count=5, |
| target=None, |
| message=msg, |
| container_id="test_100_anonymous_sender") |
| tx.wait() |
| self.assertEqual(5, tx.accepted) |
| rx.stop() |
| |
| def test_999_check_for_leaks(self): |
| self._check_for_leaks() |
| |
| |
| # |
| # Settlement options for Link attach |
| # |
| |
| class SendPresettled(LinkOption): |
| """ |
| All messages are sent presettled |
| """ |
| def apply(self, link): |
| link.snd_settle_mode = Link.SND_SETTLED |
| link.rcv_settle_mode = Link.RCV_FIRST |
| |
| |
| class SendMixed(LinkOption): |
| """ |
| Messages may be sent unsettled or settled |
| """ |
| def apply(self, link): |
| link.snd_settle_mode = Link.SND_MIXED |
| link.rcv_settle_mode = Link.RCV_FIRST |
| |
| |
| class Link1Ack(LinkOption): |
| """ |
| Messages will be sent unsettled |
| """ |
| def apply(self, link): |
| link.snd_settle_mode = Link.SND_UNSETTLED |
| link.rcv_settle_mode = Link.RCV_FIRST |
| |
| |
| class Link3Ack(LinkOption): |
| """ |
| Messages will be sent unsettled and the receiver will wait for sender to |
| settle first. |
| """ |
| def apply(self, link): |
| link.snd_settle_mode = Link.SND_UNSETTLED |
| link.rcv_settle_mode = Link.RCV_SECOND |
| |
| |
| class MulticastBase(MessagingHandler): |
| """ |
| Common multicast boilerplate code |
| """ |
| def __init__(self, config, count, body, topic=None, **handler_kwargs): |
| super(MulticastBase, self).__init__(**handler_kwargs) |
| self.msg_count = count |
| self.config = config |
| self.topic = topic or "multicast/test" |
| self.body = body |
| |
| # totals |
| self.n_senders = 0; |
| self.n_receivers = 0; |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.n_settled = 0 |
| self.n_accepted = 0 |
| self.n_released = 0 |
| self.n_rejected = 0 |
| self.n_modified = 0 |
| self.n_partial = 0 |
| |
| # all maps indexed by client name: |
| self.receivers = {} |
| self.senders = {} |
| self.r_conns = {} |
| self.s_conns = {} |
| |
| # per receiver |
| self.c_received = {} |
| |
| # count per outcome |
| self.n_outcomes = {} |
| |
| self.error = None |
| self.timers = [] |
| self.reactor = None |
| |
| def done(self): |
| # stop the reactor and clean up the test |
| for t in self.timers: |
| t.cancel() |
| for c_dict in [self.r_conns, self.s_conns]: |
| for conn in c_dict.values(): |
| conn.close() |
| self.r_conns = {} |
| self.s_conns = {} |
| |
| def timeout(self): |
| self.error = "Timeout Expired" |
| self.done() |
| |
| def create_receiver(self, container, conn, source, name): |
| # must override in subclass |
| assert(False) |
| |
| def create_sender(self, container, conn, target, name): |
| # must override in subclass |
| assert(False) |
| |
| def on_start(self, event): |
| self.reactor = event.reactor |
| self.timers.append(self.reactor.schedule(TIMEOUT, TestTimeout(self))) |
| # first create all the receivers first |
| for cfg in self.config: |
| for name in cfg['receivers']: |
| conn = event.container.connect(cfg['router'].listener) |
| assert(name not in self.r_conns) |
| self.r_conns[name] = conn |
| self.create_receiver(event.container, conn, self.topic, name) |
| self.n_receivers += 1 |
| self.c_received[name] = 0 |
| |
| def on_link_opened(self, event): |
| if event.receiver: |
| r_name = event.receiver.name |
| self.receivers[r_name] = event.receiver |
| # create senders after all receivers are opened |
| # makes it easy to check when the clients are ready |
| if len(self.receivers) == self.n_receivers: |
| for cfg in self.config: |
| for name in cfg['senders']: |
| conn = event.container.connect(cfg['router'].listener) |
| assert(name not in self.s_conns) |
| self.s_conns[name] = conn |
| self.create_sender(event.container, conn, self.topic, name) |
| self.n_senders += 1 |
| |
| def on_sendable(self, event): |
| s_name = event.sender.name |
| if s_name not in self.senders: |
| self.senders[s_name] = event.sender |
| if len(self.senders) == self.n_senders: |
| # all senders ready to send, now wait until the routes settle |
| for cfg in self.config: |
| cfg['router'].wait_address(self.topic, |
| subscribers=cfg['subscribers'], |
| remotes=cfg['remotes']) |
| for sender in self.senders.values(): |
| self.do_send(sender) |
| |
| def on_message(self, event): |
| if event.delivery.partial: |
| self.n_partial += 1 |
| else: |
| dlv = event.delivery |
| self.n_received += 1 |
| name = event.link.name |
| self.c_received[name] = 1 + self.c_received.get(name, 0) |
| |
| def on_accepted(self, event): |
| self.n_accepted += 1 |
| name = event.link.name |
| self.n_outcomes[Delivery.ACCEPTED] = 1 + self.n_outcomes.get(Delivery.ACCEPTED, 0) |
| |
| def on_released(self, event): |
| # for some reason Proton 'helpfully' calls on_released even though the |
| # delivery state is actually MODIFIED |
| if event.delivery.remote_state == Delivery.MODIFIED: |
| return self.on_modified(event) |
| self.n_released += 1 |
| name = event.link.name |
| self.n_outcomes[Delivery.RELEASED] = 1 + self.n_outcomes.get(Delivery.RELEASED, 0) |
| |
| def on_modified(self, event): |
| self.n_modified += 1 |
| name = event.link.name |
| self.n_outcomes[Delivery.MODIFIED] = 1 + self.n_outcomes.get(Delivery.MODIFIED, 0) |
| |
| def on_rejected(self, event): |
| self.n_rejected += 1 |
| name = event.link.name |
| self.n_outcomes[Delivery.REJECTED] = 1 + self.n_outcomes.get(Delivery.REJECTED, 0) |
| |
| def on_settled(self, event): |
| self.n_settled += 1 |
| |
| def run(self): |
| Container(self).run() |
| |
| # wait until all routers have cleaned up the route tables |
| clean = False |
| while not clean: |
| clean = True |
| for cfg in self.config: |
| mgmt = cfg['router'].management |
| atype = 'org.apache.qpid.dispatch.router.address' |
| addrs = mgmt.query(type=atype).get_dicts() |
| if list(filter(lambda a: a['name'].find(self.topic) != -1, addrs)): |
| clean = False |
| break |
| if not clean: |
| sleep(0.1) |
| |
| |
| class MulticastPresettled(MulticastBase): |
| """ |
| Test multicast forwarding for presettled transfers. |
| Verifies that all messages are settled by the sender |
| """ |
| def __init__(self, config, count, body, settlement_mode): |
| # use a large prefetch to prevent drops |
| super(MulticastPresettled, self).__init__(config, |
| count, |
| body, |
| prefetch=(count * 1024), |
| auto_accept=False, |
| auto_settle=False) |
| self.settlement_mode = settlement_mode |
| self.unexpected_unsettled = 0 |
| self.expected_settled = 0 |
| self.sender_settled = 0 |
| self.done_count = 0 |
| self.unsettled_deliveries = dict() |
| |
| def create_receiver(self, container, conn, source, name): |
| return container.create_receiver(conn, source=source, name=name, |
| options=self.settlement_mode) |
| |
| def create_sender(self, container, conn, target, name): |
| return container.create_sender(conn, target=target, name=name, |
| options=self.settlement_mode) |
| |
| def do_send(self, sender): |
| for i in range(self.msg_count): |
| msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body)) |
| dlv = sender.send(msg) |
| # settled before sending out the message |
| dlv.settle() |
| self.n_sent += 1 |
| |
| def check_if_done(self): |
| # wait for all present receivers to receive all messages |
| # and for all received messages to be settled by the |
| # sender |
| to_rcv = self.n_senders * self.msg_count * self.n_receivers |
| if to_rcv == self.n_received and not self.unsettled_deliveries: |
| self.done() |
| |
| def on_message(self, event): |
| super(MulticastPresettled, self).on_message(event) |
| if event.receiver: |
| if not event.delivery.settled: |
| # it may be that settle will come after on_message |
| # so track that here |
| event.delivery.update(Delivery.ACCEPTED) |
| self.unexpected_unsettled += 1 |
| tag = str(event.delivery.tag) |
| if tag not in self.unsettled_deliveries: |
| self.unsettled_deliveries[tag] = 1 |
| else: |
| self.unsettled_deliveries[tag] += 1 |
| else: |
| self.expected_settled += 1 |
| event.receiver.flow(100) |
| self.check_if_done() |
| |
| def on_settled(self, event): |
| super(MulticastPresettled, self).on_settled(event) |
| if event.receiver: |
| self.sender_settled += 1 |
| tag = str(event.delivery.tag) |
| try: |
| # got a delayed settle |
| self.unsettled_deliveries[tag] -= 1 |
| if self.unsettled_deliveries[tag] == 0: |
| del self.unsettled_deliveries[tag] |
| except KeyError: |
| pass |
| self.check_if_done() |
| |
| |
| class MulticastPresettledRxFail(MulticastPresettled): |
| """ |
| Spontaneously close a receiver or connection on message received |
| """ |
| def __init__(self, config, count, drop_clients, detach, body): |
| super(MulticastPresettledRxFail, self).__init__(config, count, body, SendPresettled()) |
| self.drop_clients = drop_clients |
| self.detach = detach |
| |
| def check_if_done(self): |
| # Verify each receiver got the expected number of messages. |
| # Avoid waiting for dropped receivers. |
| done = True |
| to_rcv = self.n_senders * self.msg_count |
| for name, count in self.c_received.items(): |
| if name not in self.drop_clients: |
| if count != to_rcv: |
| done = False |
| if done: |
| self.done() |
| |
| def on_message(self, event): |
| # close the receiver on arrival of the first message |
| r_name = event.receiver.name |
| if r_name in self.drop_clients: |
| if self.detach: |
| if event.receiver.state & Link.LOCAL_ACTIVE: |
| event.receiver.close() |
| elif event.connection.state & Connection.LOCAL_ACTIVE: |
| event.connection.close() |
| super(MulticastPresettledRxFail, self).on_message(event) |
| |
| |
| class MulticastUnsettled3Ack(MulticastBase): |
| """ |
| Send count messages per sender, senders wait for terminal outcome from |
| receivers before settling |
| """ |
| def __init__(self, config, count, body, outcomes=None): |
| pfetch = int((count + 1)/2) |
| super(MulticastUnsettled3Ack, self).__init__(config, |
| count, |
| body, |
| prefetch=pfetch, |
| auto_accept=False, |
| auto_settle=False) |
| self.outcomes = outcomes or {} |
| |
| def create_receiver(self, container, conn, source, name): |
| return container.create_receiver(conn, source=source, name=name, |
| options=Link3Ack()) |
| |
| def create_sender(self, container, conn, target, name): |
| return container.create_sender(conn, target=target, name=name, |
| options=Link3Ack()) |
| |
| def do_send(self, sender): |
| for i in range(self.msg_count): |
| msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body)) |
| dlv = sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| # receiver: send outcome do not settle |
| super(MulticastUnsettled3Ack, self).on_message(event) |
| if event.delivery.settled: |
| self.error = "Unexpected pre-settled message received!" |
| self.done() |
| return |
| r_name = event.receiver.name |
| outcome = self.outcomes.get(r_name, Delivery.ACCEPTED) |
| event.delivery.update(outcome) |
| if event.receiver.credit == 0: |
| event.receiver.flow(1) |
| |
| def on_settled(self, event): |
| super(MulticastUnsettled3Ack, self).on_settled(event) |
| event.delivery.settle() |
| self.check_if_done() |
| |
| def on_accepted(self, event): |
| super(MulticastUnsettled3Ack, self).on_accepted(event) |
| event.delivery.settle() |
| self.check_if_done() |
| |
| def on_released(self, event): |
| super(MulticastUnsettled3Ack, self).on_released(event) |
| event.delivery.settle() |
| self.check_if_done() |
| |
| def on_modified(self, event): |
| super(MulticastUnsettled3Ack, self).on_modified(event) |
| event.delivery.settle() |
| self.check_if_done() |
| |
| def on_rejected(self, event): |
| super(MulticastUnsettled3Ack, self).on_rejected(event) |
| event.delivery.settle() |
| self.check_if_done() |
| |
| def check_if_done(self): |
| to_send = self.msg_count * self.n_senders |
| to_rcv = to_send * self.n_receivers |
| |
| n_outcomes = (self.n_accepted + self.n_rejected |
| + self.n_modified + self.n_released) |
| |
| # expect senders to see settlement |
| if (self.n_sent == to_send |
| and self.n_received == to_rcv |
| and n_outcomes == to_send |
| and self.n_settled == to_rcv): |
| self.done() |
| |
| |
| class MulticastUnsettled1Ack(MulticastUnsettled3Ack): |
| """ |
| Sender sends unsettled, the receiver sets outcome and immediately settles |
| """ |
| def __init__(self, config, count, body, outcomes=None): |
| super(MulticastUnsettled1Ack, self).__init__(config, |
| count, |
| outcomes) |
| |
| def create_receiver(self, container, conn, source, name): |
| return container.create_receiver(conn, source=source, name=name, |
| options=Link1Ack()) |
| |
| def create_sender(self, container, conn, target, name): |
| return container.create_sender(conn, target=target, name=name, |
| options=Link1Ack()) |
| |
| def on_message(self, event): |
| # receiver: send outcome and settle |
| super(MulticastUnsettled1Ack, self).on_message(event) |
| event.delivery.settle() |
| |
| def check_if_done(self): |
| to_send = self.msg_count * self.n_senders |
| to_rcv = to_send * self.n_receivers |
| |
| n_outcomes = (self.n_accepted + self.n_rejected |
| + self.n_modified + self.n_released) |
| |
| # expect sender to see settlement |
| if (self.n_received == to_rcv |
| and n_outcomes == to_send |
| and self.n_settled == to_send): |
| self.done() |
| |
| |
| class MulticastUnsettledRxFail(MulticastUnsettled3Ack): |
| """ |
| Spontaneously close a receiver or connection on message received |
| """ |
| def __init__(self, config, count, drop_clients, detach, body): |
| super(MulticastUnsettledRxFail, self).__init__(config, count, body) |
| self.drop_clients = drop_clients |
| self.detach = detach |
| |
| def check_if_done(self): |
| # Verify each receiver got the expected number of messages. |
| # Avoid waiting for dropped receivers. |
| done = True |
| to_rcv = self.n_senders * self.msg_count |
| for name, count in self.c_received.items(): |
| if name not in self.drop_clients: |
| if count != to_rcv: |
| done = False |
| if done: |
| self.done() |
| |
| def on_message(self, event): |
| # close the receiver on arrival of the first message |
| r_name = event.receiver.name |
| if r_name in self.drop_clients: |
| if self.detach: |
| if event.receiver.state & Link.LOCAL_ACTIVE: |
| event.receiver.close() |
| elif event.connection.state & Connection.LOCAL_ACTIVE: |
| event.connection.close() |
| super(MulticastUnsettledRxFail, self).on_message(event) |
| |
| |
| class MulticastUnsettled3AckMA(MulticastUnsettled3Ack): |
| """ |
| Try 3 Ack, but with a bunch of user Message Annotations (why not?) |
| """ |
| def __init__(self, config, count, body, outcomes=None): |
| super(MulticastUnsettled3AckMA, self).__init__(config, |
| count, |
| body, |
| outcomes=None) |
| self._huge_ma = { |
| "my-key": "my-data", |
| "my-other-key": "my-other-data", |
| "my-map": { "my-map-key1": "X", |
| "my-map-key2": 0x12, |
| "my-map-key3": "+0123456789" * 101, |
| "my-map-list": [i for i in range(97)] |
| }, |
| "my-last-key": "so long, folks!" |
| } |
| |
| def do_send(self, sender): |
| for i in range(self.msg_count): |
| msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body)) |
| msg.annotations = self._huge_ma |
| dlv = sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_message(self, event): |
| msg = event.message |
| if event.message.annotations != self._huge_ma: |
| self.error = "forwarded message annotations mismatch original" |
| self.done() |
| return |
| super(MulticastUnsettled3AckMA, self).on_message(event) |
| |
| |
| class MulticastCreditBlocked(MessagingHandler): |
| """ |
| Ensure that credit is not provided when there are no consumers present. |
| This client connects to 'address' and creates a sender to 'target'. Once |
| the sending link has opened a short timer is started. It is expected that |
| on_sendable() is NOT invoked before the timer expires. |
| """ |
| def __init__(self, address, target=None, timeout=2, **handler_kwargs): |
| super(MulticastCreditBlocked, self).__init__(**handler_kwargs) |
| self.target = target |
| self.address = address |
| self.time_out = timeout |
| |
| self.conn = None |
| self.sender = None |
| self.timer = None |
| self.error = "Timeout NOT triggered as expected!" |
| |
| def done(self): |
| # stop the reactor and clean up the test |
| if self.timer: |
| self.timer.cancel() |
| if self.conn: |
| self.conn.close() |
| |
| def timeout(self): |
| self.error = None |
| self.done() |
| |
| def on_start(self, event): |
| self.conn = event.container.connect(self.address) |
| self.sender = event.container.create_sender(self.conn, |
| target=self.target, |
| name="McastBlocked") |
| |
| def on_link_opened(self, event): |
| self.timer = event.reactor.schedule(self.time_out, TestTimeout(self)) |
| |
| def on_sendable(self, event): |
| self.error = "Unexpected call to on_sendable()!" |
| self.done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |