blob: 486d75b2d0781ea286cd7d0bc7e746ff9e3d4ea9 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
# Test the multicast forwarder
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import sys
from time import sleep
from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton.reactor import LinkOption
from proton import Connection
from proton import Link
from proton import Message
from proton import Delivery
from qpid_dispatch.management.client import Node
from system_test import AsyncTestSender
from system_test import AsyncTestReceiver
from system_test import TestCase
from system_test import Qdrouterd
from system_test import main_module
from system_test import TIMEOUT
from system_test import TestTimeout
from system_test import unittest
MAX_FRAME=1023
LINK_CAPACITY=250
W_THREADS=2
LARGE_PAYLOAD = ("X" * MAX_FRAME) * 19
# check for leaks of the following entities
ALLOC_STATS=["qd_message_t",
"qd_buffer_t",
"qdr_delivery_t"]
class MulticastLinearTest(TestCase):
"""
Verify the multicast forwarding logic across a multihop linear router
configuration
"""
@classmethod
def setUpClass(cls):
"""Start a router"""
super(MulticastLinearTest, cls).setUpClass()
def router(name, mode, extra):
config = [
('router', {'mode': mode,
'id': name,
'allowUnsettledMulticast': 'yes',
'workerThreads': W_THREADS}),
('listener', {'role': 'normal',
'port': cls.tester.get_port(),
'maxFrameSize': MAX_FRAME,
'linkCapacity': LINK_CAPACITY}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
if extra:
config.extend(extra)
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
return cls.routers[-1]
# configuration:
# two edge routers connected via 2 interior routers.
#
# +-------+ +---------+ +---------+ +-------+
# | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 |
# +-------+ +---------+ +---------+ +-------+
#
# Each router has 2 multicast consumers
# EA1 and INT.A each have a multicast sender
cls.routers = []
interrouter_port = cls.tester.get_port()
cls.INTA_edge_port = cls.tester.get_port()
cls.INTB_edge_port = cls.tester.get_port()
router('INT.A', 'interior',
[('listener', {'role': 'inter-router',
'port': interrouter_port}),
('listener', {'role': 'edge', 'port': cls.INTA_edge_port})])
cls.INT_A = cls.routers[0]
cls.INT_A.listener = cls.INT_A.addresses[0]
router('INT.B', 'interior',
[('connector', {'name': 'connectorToA',
'role': 'inter-router',
'port': interrouter_port}),
('listener', {'role': 'edge',
'port': cls.INTB_edge_port})])
cls.INT_B = cls.routers[1]
cls.INT_B.listener = cls.INT_B.addresses[0]
router('EA1', 'edge',
[('listener', {'name': 'rc', 'role': 'route-container',
'port': cls.tester.get_port()}),
('connector', {'name': 'uplink', 'role': 'edge',
'port': cls.INTA_edge_port}),
('linkRoute', {'prefix': 'CfgLinkRoute1', 'containerId': 'FakeBroker', 'direction': 'in'}),
('linkRoute', {'prefix': 'CfgLinkRoute1', 'containerId': 'FakeBroker', 'direction': 'out'})])
cls.EA1 = cls.routers[2]
cls.EA1.listener = cls.EA1.addresses[0]
cls.EA1.route_container = cls.EA1.addresses[1]
router('EB1', 'edge',
[('connector', {'name': 'uplink',
'role': 'edge',
'port': cls.INTB_edge_port,
'maxFrameSize': 1024}),
('listener', {'name': 'rc', 'role': 'route-container',
'port': cls.tester.get_port()}),
('linkRoute', {'pattern': '*.cfg.pattern.#', 'containerId': 'FakeBroker', 'direction': 'in'}),
('linkRoute', {'pattern': '*.cfg.pattern.#', 'containerId': 'FakeBroker', 'direction': 'out'})])
cls.EB1 = cls.routers[3]
cls.EB1.listener = cls.EB1.addresses[0]
cls.EB1.route_container = cls.EB1.addresses[1]
cls.INT_A.wait_router_connected('INT.B')
cls.INT_B.wait_router_connected('INT.A')
cls.EA1.wait_connectors()
cls.EB1.wait_connectors()
# Client topology:
# all routes have 2 receivers
# Edge router EA1 and interior INT_A have a sender each
#
cls.config = [
# edge router EA1:
{'router': cls.EA1,
'senders': ['S-EA1-1'],
'receivers': ['R-EA1-1', 'R-EA1-2'],
'subscribers': 2,
'remotes': 0
},
# Interior router INT_A:
{'router': cls.INT_A,
'senders': ['S-INT_A-1'],
'receivers': ['R-INT_A-1', 'R-INT_A-2'],
'subscribers': 3,
'remotes': 1,
},
# Interior router INT_B:
{'router': cls.INT_B,
'senders': [],
'receivers': ['R-INT_B-1', 'R-INT_B-2'],
'subscribers': 3,
'remotes': 1,
},
# edge router EB1
{'router': cls.EB1,
'senders': [],
'receivers': ['R-EB1-1', 'R-EB1-2'],
'subscribers': 2,
'remotes': 0,
}
]
def _get_alloc_stats(self, router, stats):
# return a map of the current allocator counters for each entity type
# name in stats
#
# 57: END = [{u'heldByThreads': int32(384), u'typeSize': int32(536),
# u'transferBatchSize': int32(64), u'globalFreeListMax': int32(0),
# u'batchesRebalancedToGlobal': int32(774), u'typeName':
# u'qd_buffer_t', u'batchesRebalancedToThreads': int32(736),
# u'totalFreeToHeap': int32(0), u'totalAllocFromHeap': int32(2816),
# u'localFreeListMax': int32(128), u'type':
# u'org.apache.qpid.dispatch.allocator', u'identity':
# u'allocator/qd_buffer_t', u'name': u'allocator/qd_buffer_t'}]
d = dict()
mgmt = router.management
atype = 'org.apache.qpid.dispatch.allocator'
q = mgmt.query(type=atype).get_dicts()
for name in stats:
d[name] = list(filter(lambda a: a['typeName'] == name, q))[0]
return d
def _check_for_leaks(self):
for r in self.routers:
stats = self._get_alloc_stats(r, ALLOC_STATS)
for name in ALLOC_STATS:
# ensure threads haven't leaked
max_allowed = ((W_THREADS + 1)
* stats[name]['localFreeListMax'])
held = stats[name]['heldByThreads']
if held >= (2 * max_allowed):
print("OOPS!!! %s: (%s) - held=%d max=%d\n %s\n"
% (r.config.router_id,
name, held, max_allowed, stats))
sys.stdout.flush()
self.assertFalse(held >= (2 * max_allowed))
#
# run all the negative tests first so that if we screw up the internal
# state of the brokers the positive tests will likely fail
#
def _presettled_large_msg_rx_detach(self, config, count, drop_clients):
# detach receivers during receive
body = " MCAST PRESETTLED LARGE RX DETACH " + LARGE_PAYLOAD
test = MulticastPresettledRxFail(config, count,
drop_clients,
detach=True,
body=body)
test.run()
self.assertEqual(None, test.error)
def test_01_presettled_large_msg_rx_detach(self):
self._presettled_large_msg_rx_detach(self.config, 10, ['R-EA1-1', 'R-EB1-2'])
self._presettled_large_msg_rx_detach(self.config, 10, ['R-INT_A-2', 'R-INT_B-1'])
def _presettled_large_msg_rx_close(self, config, count, drop_clients):
# close receiver connections during receive
body = " MCAST PRESETTLED LARGE RX CLOSE " + LARGE_PAYLOAD
test = MulticastPresettledRxFail(config, count,
drop_clients,
detach=False,
body=body)
test.run()
self.assertEqual(None, test.error)
def test_02_presettled_large_msg_rx_close(self):
self._presettled_large_msg_rx_close(self.config, 10, ['R-EA1-2', 'R-EB1-1'])
self._presettled_large_msg_rx_close(self.config, 10, ['R-INT_A-1', 'R-INT_B-2'])
def _unsettled_large_msg_rx_detach(self, config, count, drop_clients):
# detach receivers during the test
body = " MCAST UNSETTLED LARGE RX DETACH " + LARGE_PAYLOAD
test = MulticastUnsettledRxFail(self.config, count, drop_clients, detach=True, body=body)
test.run()
self.assertEqual(None, test.error)
def test_10_unsettled_large_msg_rx_detach(self):
self._unsettled_large_msg_rx_detach(self.config, 10, ['R-EA1-1', 'R-EB1-2'])
self._unsettled_large_msg_rx_detach(self.config, 10, ['R-INT_A-2', 'R-INT_B-1'])
def _unsettled_large_msg_rx_close(self, config, count, drop_clients):
# close receiver connections during test
body = " MCAST UNSETTLED LARGE RX CLOSE " + LARGE_PAYLOAD
test = MulticastUnsettledRxFail(self.config, count, drop_clients, detach=False, body=body)
test.run()
self.assertEqual(None, test.error)
def test_11_unsettled_large_msg_rx_close(self):
self._unsettled_large_msg_rx_close(self.config, 10, ['R-EA1-2', 'R-EB1-1', ])
self._unsettled_large_msg_rx_close(self.config, 10, ['R-INT_A-1', 'R-INT_B-2'])
#
# now the positive tests
#
def test_50_presettled(self):
# Simply send a bunch of pre-settled multicast messages
body = " MCAST PRESETTLED "
test = MulticastPresettled(self.config, 10, body, SendPresettled())
test.run()
def test_51_presettled_mixed_large_msg(self):
# Same as above, but large message bodies (mixed sender settle mode)
body = " MCAST MAYBE PRESETTLED LARGE " + LARGE_PAYLOAD
test = MulticastPresettled(self.config, 11, body, SendMixed())
test.run()
self.assertEqual(None, test.error)
def test_52_presettled_large_msg(self):
# Same as above, (pre-settled sender settle mode)
body = " MCAST PRESETTLED LARGE " + LARGE_PAYLOAD
test = MulticastPresettled(self.config, 13, body, SendPresettled())
test.run()
self.assertEqual(None, test.error)
def test_60_unsettled_3ack(self):
# Sender sends unsettled, waits for Outcome from Receiver then settles
# Expect all messages to be accepted
body = " MCAST UNSETTLED "
test = MulticastUnsettled3Ack(self.config, 10, body)
test.run()
self.assertEqual(None, test.error)
self.assertEqual(test.n_outcomes[Delivery.ACCEPTED], test.n_sent)
def test_61_unsettled_3ack_large_msg(self):
# Same as above but with multiframe streaming
body = " MCAST UNSETTLED LARGE " + LARGE_PAYLOAD
test = MulticastUnsettled3Ack(self.config, 11, body=body)
test.run()
self.assertEqual(None, test.error)
self.assertEqual(test.n_outcomes[Delivery.ACCEPTED], test.n_sent)
def _unsettled_3ack_outcomes(self,
config,
count,
outcomes,
expected):
body = " MCAST UNSETTLED 3ACK OUTCOMES " + LARGE_PAYLOAD
test = MulticastUnsettled3Ack(self.config,
count,
body,
outcomes=outcomes)
test.run()
self.assertEqual(None, test.error)
self.assertEqual(test.n_outcomes[expected], test.n_sent)
def test_63_unsettled_3ack_outcomes(self):
# Verify the expected outcome is returned to the sender when the
# receivers return different outcome values. If no outcome is
# specified for a receiver it will default to ACCEPTED
# expect REJECTED if any reject:
self._unsettled_3ack_outcomes(self.config, 3,
{'R-EB1-1': Delivery.REJECTED,
'R-EB1-2': Delivery.MODIFIED,
'R-INT_B-2': Delivery.RELEASED},
Delivery.REJECTED)
self._unsettled_3ack_outcomes(self.config, 3,
{'R-EB1-1': Delivery.REJECTED,
'R-INT_B-2': Delivery.RELEASED},
Delivery.REJECTED)
# expect ACCEPT if no rejects
self._unsettled_3ack_outcomes(self.config, 3,
{'R-EB1-2': Delivery.MODIFIED,
'R-INT_B-2': Delivery.RELEASED},
Delivery.ACCEPTED)
# expect MODIFIED over RELEASED
self._unsettled_3ack_outcomes(self.config, 3,
{'R-EA1-1': Delivery.RELEASED,
'R-EA1-2': Delivery.RELEASED,
'R-INT_A-1': Delivery.RELEASED,
'R-INT_A-2': Delivery.RELEASED,
'R-INT_B-1': Delivery.RELEASED,
'R-INT_B-2': Delivery.RELEASED,
'R-EB1-1': Delivery.RELEASED,
'R-EB1-2': Delivery.MODIFIED},
Delivery.MODIFIED)
# and released only if all released
self._unsettled_3ack_outcomes(self.config, 3,
{'R-EA1-1': Delivery.RELEASED,
'R-EA1-2': Delivery.RELEASED,
'R-INT_A-1': Delivery.RELEASED,
'R-INT_A-2': Delivery.RELEASED,
'R-INT_B-1': Delivery.RELEASED,
'R-INT_B-2': Delivery.RELEASED,
'R-EB1-1': Delivery.RELEASED,
'R-EB1-2': Delivery.RELEASED},
Delivery.RELEASED)
def test_70_unsettled_1ack(self):
# Sender sends unsettled, expects both outcome and settlement from
# receiver before sender settles locally
body = " MCAST UNSETTLED 1ACK "
test = MulticastUnsettled1Ack(self.config, 10, body)
test.run()
self.assertEqual(None, test.error)
def test_71_unsettled_1ack_large_msg(self):
# Same as above but with multiframe streaming
body = " MCAST UNSETTLED 1ACK LARGE " + LARGE_PAYLOAD
test = MulticastUnsettled1Ack(self.config, 10, body)
test.run()
self.assertEqual(None, test.error)
def test_80_unsettled_3ack_message_annotations(self):
body = " MCAST UNSETTLED 3ACK LARGE MESSAGE ANNOTATIONS " + LARGE_PAYLOAD
test = MulticastUnsettled3AckMA(self.config, 10, body)
test.run()
self.assertEqual(None, test.error)
def test_90_credit_no_subscribers(self):
"""
Verify that multicast senders are blocked until a consumer is present.
"""
test = MulticastCreditBlocked(address=self.EA1.listener,
target='multicast/no/subscriber1')
test.run()
self.assertEqual(None, test.error)
test = MulticastCreditBlocked(address=self.INT_A.listener,
target='multicast/no/subscriber2')
test.run()
self.assertEqual(None, test.error)
def test_91_anonymous_sender(self):
"""
Verify that senders over anonymous links do not block waiting for
consumers.
"""
# no receiver - should not block, return RELEASED
msg = Message(body="test_100_anonymous_sender")
msg.address = "multicast/test_100_anonymous_sender"
tx = AsyncTestSender(address=self.INT_B.listener,
count=5,
target=None,
message=msg,
container_id="test_100_anonymous_sender")
tx.wait()
self.assertEqual(5, tx.released)
# now add a receiver:
rx = AsyncTestReceiver(address=self.INT_A.listener,
source=msg.address)
self.INT_B.wait_address(msg.address)
tx = AsyncTestSender(address=self.INT_B.listener,
count=5,
target=None,
message=msg,
container_id="test_100_anonymous_sender")
tx.wait()
self.assertEqual(5, tx.accepted)
rx.stop()
def test_999_check_for_leaks(self):
self._check_for_leaks()
#
# Settlement options for Link attach
#
class SendPresettled(LinkOption):
"""
All messages are sent presettled
"""
def apply(self, link):
link.snd_settle_mode = Link.SND_SETTLED
link.rcv_settle_mode = Link.RCV_FIRST
class SendMixed(LinkOption):
"""
Messages may be sent unsettled or settled
"""
def apply(self, link):
link.snd_settle_mode = Link.SND_MIXED
link.rcv_settle_mode = Link.RCV_FIRST
class Link1Ack(LinkOption):
"""
Messages will be sent unsettled
"""
def apply(self, link):
link.snd_settle_mode = Link.SND_UNSETTLED
link.rcv_settle_mode = Link.RCV_FIRST
class Link3Ack(LinkOption):
"""
Messages will be sent unsettled and the receiver will wait for sender to
settle first.
"""
def apply(self, link):
link.snd_settle_mode = Link.SND_UNSETTLED
link.rcv_settle_mode = Link.RCV_SECOND
class MulticastBase(MessagingHandler):
"""
Common multicast boilerplate code
"""
def __init__(self, config, count, body, topic=None, **handler_kwargs):
super(MulticastBase, self).__init__(**handler_kwargs)
self.msg_count = count
self.config = config
self.topic = topic or "multicast/test"
self.body = body
# totals
self.n_senders = 0;
self.n_receivers = 0;
self.n_sent = 0
self.n_received = 0
self.n_settled = 0
self.n_accepted = 0
self.n_released = 0
self.n_rejected = 0
self.n_modified = 0
self.n_partial = 0
# all maps indexed by client name:
self.receivers = {}
self.senders = {}
self.r_conns = {}
self.s_conns = {}
# per receiver
self.c_received = {}
# count per outcome
self.n_outcomes = {}
self.error = None
self.timers = []
self.reactor = None
def done(self):
# stop the reactor and clean up the test
for t in self.timers:
t.cancel()
for c_dict in [self.r_conns, self.s_conns]:
for conn in c_dict.values():
conn.close()
self.r_conns = {}
self.s_conns = {}
def timeout(self):
self.error = "Timeout Expired"
self.done()
def create_receiver(self, container, conn, source, name):
# must override in subclass
assert(False)
def create_sender(self, container, conn, target, name):
# must override in subclass
assert(False)
def on_start(self, event):
self.reactor = event.reactor
self.timers.append(self.reactor.schedule(TIMEOUT, TestTimeout(self)))
# first create all the receivers first
for cfg in self.config:
for name in cfg['receivers']:
conn = event.container.connect(cfg['router'].listener)
assert(name not in self.r_conns)
self.r_conns[name] = conn
self.create_receiver(event.container, conn, self.topic, name)
self.n_receivers += 1
self.c_received[name] = 0
def on_link_opened(self, event):
if event.receiver:
r_name = event.receiver.name
self.receivers[r_name] = event.receiver
# create senders after all receivers are opened
# makes it easy to check when the clients are ready
if len(self.receivers) == self.n_receivers:
for cfg in self.config:
for name in cfg['senders']:
conn = event.container.connect(cfg['router'].listener)
assert(name not in self.s_conns)
self.s_conns[name] = conn
self.create_sender(event.container, conn, self.topic, name)
self.n_senders += 1
def on_sendable(self, event):
s_name = event.sender.name
if s_name not in self.senders:
self.senders[s_name] = event.sender
if len(self.senders) == self.n_senders:
# all senders ready to send, now wait until the routes settle
for cfg in self.config:
cfg['router'].wait_address(self.topic,
subscribers=cfg['subscribers'],
remotes=cfg['remotes'])
for sender in self.senders.values():
self.do_send(sender)
def on_message(self, event):
if event.delivery.partial:
self.n_partial += 1
else:
dlv = event.delivery
self.n_received += 1
name = event.link.name
self.c_received[name] = 1 + self.c_received.get(name, 0)
def on_accepted(self, event):
self.n_accepted += 1
name = event.link.name
self.n_outcomes[Delivery.ACCEPTED] = 1 + self.n_outcomes.get(Delivery.ACCEPTED, 0)
def on_released(self, event):
# for some reason Proton 'helpfully' calls on_released even though the
# delivery state is actually MODIFIED
if event.delivery.remote_state == Delivery.MODIFIED:
return self.on_modified(event)
self.n_released += 1
name = event.link.name
self.n_outcomes[Delivery.RELEASED] = 1 + self.n_outcomes.get(Delivery.RELEASED, 0)
def on_modified(self, event):
self.n_modified += 1
name = event.link.name
self.n_outcomes[Delivery.MODIFIED] = 1 + self.n_outcomes.get(Delivery.MODIFIED, 0)
def on_rejected(self, event):
self.n_rejected += 1
name = event.link.name
self.n_outcomes[Delivery.REJECTED] = 1 + self.n_outcomes.get(Delivery.REJECTED, 0)
def on_settled(self, event):
self.n_settled += 1
def run(self):
Container(self).run()
# wait until all routers have cleaned up the route tables
clean = False
while not clean:
clean = True
for cfg in self.config:
mgmt = cfg['router'].management
atype = 'org.apache.qpid.dispatch.router.address'
addrs = mgmt.query(type=atype).get_dicts()
if list(filter(lambda a: a['name'].find(self.topic) != -1, addrs)):
clean = False
break
if not clean:
sleep(0.1)
class MulticastPresettled(MulticastBase):
"""
Test multicast forwarding for presettled transfers.
Verifies that all messages are settled by the sender
"""
def __init__(self, config, count, body, settlement_mode):
# use a large prefetch to prevent drops
super(MulticastPresettled, self).__init__(config,
count,
body,
prefetch=(count * 1024),
auto_accept=False,
auto_settle=False)
self.settlement_mode = settlement_mode
self.unexpected_unsettled = 0
self.expected_settled = 0
self.sender_settled = 0
self.done_count = 0
self.unsettled_deliveries = dict()
def create_receiver(self, container, conn, source, name):
return container.create_receiver(conn, source=source, name=name,
options=self.settlement_mode)
def create_sender(self, container, conn, target, name):
return container.create_sender(conn, target=target, name=name,
options=self.settlement_mode)
def do_send(self, sender):
for i in range(self.msg_count):
msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body))
dlv = sender.send(msg)
# settled before sending out the message
dlv.settle()
self.n_sent += 1
def check_if_done(self):
# wait for all present receivers to receive all messages
# and for all received messages to be settled by the
# sender
to_rcv = self.n_senders * self.msg_count * self.n_receivers
if to_rcv == self.n_received and not self.unsettled_deliveries:
self.done()
def on_message(self, event):
super(MulticastPresettled, self).on_message(event)
if event.receiver:
if not event.delivery.settled:
# it may be that settle will come after on_message
# so track that here
event.delivery.update(Delivery.ACCEPTED)
self.unexpected_unsettled += 1
tag = str(event.delivery.tag)
if tag not in self.unsettled_deliveries:
self.unsettled_deliveries[tag] = 1
else:
self.unsettled_deliveries[tag] += 1
else:
self.expected_settled += 1
event.receiver.flow(100)
self.check_if_done()
def on_settled(self, event):
super(MulticastPresettled, self).on_settled(event)
if event.receiver:
self.sender_settled += 1
tag = str(event.delivery.tag)
try:
# got a delayed settle
self.unsettled_deliveries[tag] -= 1
if self.unsettled_deliveries[tag] == 0:
del self.unsettled_deliveries[tag]
except KeyError:
pass
self.check_if_done()
class MulticastPresettledRxFail(MulticastPresettled):
"""
Spontaneously close a receiver or connection on message received
"""
def __init__(self, config, count, drop_clients, detach, body):
super(MulticastPresettledRxFail, self).__init__(config, count, body, SendPresettled())
self.drop_clients = drop_clients
self.detach = detach
def check_if_done(self):
# Verify each receiver got the expected number of messages.
# Avoid waiting for dropped receivers.
done = True
to_rcv = self.n_senders * self.msg_count
for name, count in self.c_received.items():
if name not in self.drop_clients:
if count != to_rcv:
done = False
if done:
self.done()
def on_message(self, event):
# close the receiver on arrival of the first message
r_name = event.receiver.name
if r_name in self.drop_clients:
if self.detach:
if event.receiver.state & Link.LOCAL_ACTIVE:
event.receiver.close()
elif event.connection.state & Connection.LOCAL_ACTIVE:
event.connection.close()
super(MulticastPresettledRxFail, self).on_message(event)
class MulticastUnsettled3Ack(MulticastBase):
"""
Send count messages per sender, senders wait for terminal outcome from
receivers before settling
"""
def __init__(self, config, count, body, outcomes=None):
pfetch = int((count + 1)/2)
super(MulticastUnsettled3Ack, self).__init__(config,
count,
body,
prefetch=pfetch,
auto_accept=False,
auto_settle=False)
self.outcomes = outcomes or {}
def create_receiver(self, container, conn, source, name):
return container.create_receiver(conn, source=source, name=name,
options=Link3Ack())
def create_sender(self, container, conn, target, name):
return container.create_sender(conn, target=target, name=name,
options=Link3Ack())
def do_send(self, sender):
for i in range(self.msg_count):
msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body))
dlv = sender.send(msg)
self.n_sent += 1
def on_message(self, event):
# receiver: send outcome do not settle
super(MulticastUnsettled3Ack, self).on_message(event)
if event.delivery.settled:
self.error = "Unexpected pre-settled message received!"
self.done()
return
r_name = event.receiver.name
outcome = self.outcomes.get(r_name, Delivery.ACCEPTED)
event.delivery.update(outcome)
if event.receiver.credit == 0:
event.receiver.flow(1)
def on_settled(self, event):
super(MulticastUnsettled3Ack, self).on_settled(event)
event.delivery.settle()
self.check_if_done()
def on_accepted(self, event):
super(MulticastUnsettled3Ack, self).on_accepted(event)
event.delivery.settle()
self.check_if_done()
def on_released(self, event):
super(MulticastUnsettled3Ack, self).on_released(event)
event.delivery.settle()
self.check_if_done()
def on_modified(self, event):
super(MulticastUnsettled3Ack, self).on_modified(event)
event.delivery.settle()
self.check_if_done()
def on_rejected(self, event):
super(MulticastUnsettled3Ack, self).on_rejected(event)
event.delivery.settle()
self.check_if_done()
def check_if_done(self):
to_send = self.msg_count * self.n_senders
to_rcv = to_send * self.n_receivers
n_outcomes = (self.n_accepted + self.n_rejected
+ self.n_modified + self.n_released)
# expect senders to see settlement
if (self.n_sent == to_send
and self.n_received == to_rcv
and n_outcomes == to_send
and self.n_settled == to_rcv):
self.done()
class MulticastUnsettled1Ack(MulticastUnsettled3Ack):
"""
Sender sends unsettled, the receiver sets outcome and immediately settles
"""
def __init__(self, config, count, body, outcomes=None):
super(MulticastUnsettled1Ack, self).__init__(config,
count,
outcomes)
def create_receiver(self, container, conn, source, name):
return container.create_receiver(conn, source=source, name=name,
options=Link1Ack())
def create_sender(self, container, conn, target, name):
return container.create_sender(conn, target=target, name=name,
options=Link1Ack())
def on_message(self, event):
# receiver: send outcome and settle
super(MulticastUnsettled1Ack, self).on_message(event)
event.delivery.settle()
def check_if_done(self):
to_send = self.msg_count * self.n_senders
to_rcv = to_send * self.n_receivers
n_outcomes = (self.n_accepted + self.n_rejected
+ self.n_modified + self.n_released)
# expect sender to see settlement
if (self.n_received == to_rcv
and n_outcomes == to_send
and self.n_settled == to_send):
self.done()
class MulticastUnsettledRxFail(MulticastUnsettled3Ack):
"""
Spontaneously close a receiver or connection on message received
"""
def __init__(self, config, count, drop_clients, detach, body):
super(MulticastUnsettledRxFail, self).__init__(config, count, body)
self.drop_clients = drop_clients
self.detach = detach
def check_if_done(self):
# Verify each receiver got the expected number of messages.
# Avoid waiting for dropped receivers.
done = True
to_rcv = self.n_senders * self.msg_count
for name, count in self.c_received.items():
if name not in self.drop_clients:
if count != to_rcv:
done = False
if done:
self.done()
def on_message(self, event):
# close the receiver on arrival of the first message
r_name = event.receiver.name
if r_name in self.drop_clients:
if self.detach:
if event.receiver.state & Link.LOCAL_ACTIVE:
event.receiver.close()
elif event.connection.state & Connection.LOCAL_ACTIVE:
event.connection.close()
super(MulticastUnsettledRxFail, self).on_message(event)
class MulticastUnsettled3AckMA(MulticastUnsettled3Ack):
"""
Try 3 Ack, but with a bunch of user Message Annotations (why not?)
"""
def __init__(self, config, count, body, outcomes=None):
super(MulticastUnsettled3AckMA, self).__init__(config,
count,
body,
outcomes=None)
self._huge_ma = {
"my-key": "my-data",
"my-other-key": "my-other-data",
"my-map": { "my-map-key1": "X",
"my-map-key2": 0x12,
"my-map-key3": "+0123456789" * 101,
"my-map-list": [i for i in range(97)]
},
"my-last-key": "so long, folks!"
}
def do_send(self, sender):
for i in range(self.msg_count):
msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body))
msg.annotations = self._huge_ma
dlv = sender.send(msg)
self.n_sent += 1
def on_message(self, event):
msg = event.message
if event.message.annotations != self._huge_ma:
self.error = "forwarded message annotations mismatch original"
self.done()
return
super(MulticastUnsettled3AckMA, self).on_message(event)
class MulticastCreditBlocked(MessagingHandler):
"""
Ensure that credit is not provided when there are no consumers present.
This client connects to 'address' and creates a sender to 'target'. Once
the sending link has opened a short timer is started. It is expected that
on_sendable() is NOT invoked before the timer expires.
"""
def __init__(self, address, target=None, timeout=2, **handler_kwargs):
super(MulticastCreditBlocked, self).__init__(**handler_kwargs)
self.target = target
self.address = address
self.time_out = timeout
self.conn = None
self.sender = None
self.timer = None
self.error = "Timeout NOT triggered as expected!"
def done(self):
# stop the reactor and clean up the test
if self.timer:
self.timer.cancel()
if self.conn:
self.conn.close()
def timeout(self):
self.error = None
self.done()
def on_start(self, event):
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn,
target=self.target,
name="McastBlocked")
def on_link_opened(self, event):
self.timer = event.reactor.schedule(self.time_out, TestTimeout(self))
def on_sendable(self, event):
self.error = "Unexpected call to on_sendable()!"
self.done()
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())