blob: 69863084da12c93d60bb53be2f052696c5848c39 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from time import sleep
from proton import Condition, Message, Delivery
from system_test import TestCase, Qdrouterd, TIMEOUT, get_link_info, \
get_inter_router_links, has_mobile_dest_in_address_table, PollTimeout, TestTimeout
from proton.handlers import MessagingHandler
from proton.reactor import Container
from qpid_dispatch.management.client import Node
LARGE_PAYLOAD = ("X" * 1024) * 30
_LINK_STATISTIC_KEYS = set(['unsettledCount',
'undeliveredCount',
'releasedCount',
'presettledCount',
'acceptedCount',
'droppedPresettledCount',
'rejectedCount',
'deliveryCount',
'modifiedCount'])
def get_body(n_sent, large_message=False):
if large_message:
body = {'number': n_sent, 'msg': LARGE_PAYLOAD}
else:
body = {'number': n_sent}
def _link_stats_are_zero(statistics, keys):
"""
Verify that all statistics whose keys are present are zero
"""
for key in keys:
if statistics.get(key) != 0:
return False
return True;
class OneRouterModifiedTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(OneRouterModifiedTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def router_modified_counts(self, large_message=False):
address = self.router.addresses[0]
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_modified_index = outs.attribute_names.index('modifiedDeliveries')
results = outs.results[0]
num_modified_deliveries_pre_test = results[deliveries_modified_index]
num_messages = 10
test = ModifiedDeliveriesTest(address, num_messages, large_message)
test.run()
outs = local_node.query(type='org.apache.qpid.dispatch.router')
results = outs.results[0]
self.assertEqual(results[deliveries_modified_index] - num_modified_deliveries_pre_test, num_messages)
# check link statistics
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'modifiedCount'])))
self.assertEqual(test.sender_stats['deliveryCount'], num_messages)
self.assertEqual(test.sender_stats['modifiedCount'], num_messages)
# receiver just drops the link, so these are not counted as modified
# but unsettled instead
self.assertEqual(test.receiver_stats['deliveryCount'], num_messages)
self.assertEqual(test.receiver_stats['unsettledCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'unsettledCount'])))
def test_one_router_modified_counts(self):
self.router_modified_counts()
def test_one_router_large_message_modified_counts(self):
self.router_modified_counts(True)
class OneRouterRejectedTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(OneRouterRejectedTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def one_router_rejected_counts(self, large_message=False):
address = self.router.addresses[0]
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_rejected_index = outs.attribute_names.index('rejectedDeliveries')
results = outs.results[0]
deliveries_rejected_pre_test = results[deliveries_rejected_index]
num_messages = 10
test = RejectedDeliveriesTest(address, num_messages, large_message)
test.run()
outs = local_node.query(type='org.apache.qpid.dispatch.router')
results = outs.results[0]
self.assertEqual(results[deliveries_rejected_index] - deliveries_rejected_pre_test, num_messages)
# check link statistics
self.assertEqual(test.sender_stats['deliveryCount'], num_messages)
self.assertEqual(test.sender_stats['rejectedCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'rejectedCount'])))
self.assertEqual(test.receiver_stats['deliveryCount'], num_messages)
self.assertEqual(test.receiver_stats['rejectedCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'rejectedCount'])))
def test_one_router_rejected_counts(self):
self.one_router_rejected_counts()
def test_one_router_large_message_rejected_counts(self):
self.one_router_rejected_counts(True)
class OneRouterReleasedDroppedPresettledTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(OneRouterReleasedDroppedPresettledTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def one_router_released_dropped_count(self, large_message=False):
address = self.router.addresses[0]
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_dropped_presettled_index = outs.attribute_names.index('droppedPresettledDeliveries')
deliveries_released_index = outs.attribute_names.index('releasedDeliveries')
deliveries_presettled_index = outs.attribute_names.index('presettledDeliveries')
results = outs.results[0]
deliveries_dropped_presettled_pre_test = results[deliveries_dropped_presettled_index]
deliveries_released_pre_test = results[deliveries_released_index]
deliveries_presettled_pre_test = results[deliveries_presettled_index]
num_messages = 20
test = ReleasedDroppedPresettledCountTest(address, num_messages, large_message)
test.run()
outs = local_node.query(type='org.apache.qpid.dispatch.router')
results = outs.results[0]
self.assertEqual(results[deliveries_dropped_presettled_index] - deliveries_dropped_presettled_pre_test, 10)
self.assertEqual(results[deliveries_released_index] - deliveries_released_pre_test, 10)
self.assertEqual(results[deliveries_presettled_index] - deliveries_presettled_pre_test, 10)
# check link statistics
self.assertEqual(test.sender_stats['deliveryCount'], test.n_sent)
self.assertEqual(test.sender_stats['releasedCount'], 10)
self.assertEqual(test.sender_stats['presettledCount'], 10)
self.assertEqual(test.sender_stats['droppedPresettledCount'], 10)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'releasedCount',
'presettledCount',
'droppedPresettledCount'])))
def test_one_router_released_dropped_counts(self):
self.one_router_released_dropped_count()
def test_one_router_large_message_released_dropped_counts(self):
self.one_router_released_dropped_count(True)
class TwoRouterReleasedDroppedPresettledTest(TestCase):
@classmethod
def setUpClass(cls):
super(TwoRouterReleasedDroppedPresettledTest, cls).setUpClass()
listen_port_1 = cls.tester.get_port()
listen_port_2 = cls.tester.get_port()
listen_port_inter_router = cls.tester.get_port()
config_1 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'A'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
])
config_2 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router}),
])
cls.routers = []
cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
cls.routers[1].wait_router_connected('A')
def two_router_released_dropped_counts(self, large_message=False):
address = self.routers[0].addresses[0]
# Send presettled and settled messages to router 1.
# Make sure the hello messages (which are presettled dont show up in the counts
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_dropped_presettled_index = outs.attribute_names.index('droppedPresettledDeliveries')
deliveries_released_index = outs.attribute_names.index('releasedDeliveries')
deliveries_presettled_index = outs.attribute_names.index('presettledDeliveries')
results = outs.results[0]
deliveries_dropped_presettled_pre_test = results[deliveries_dropped_presettled_index]
deliveries_released_pre_test = results[deliveries_released_index]
deliveries_presettled_pre_test = results[deliveries_presettled_index]
num_messages = 20
test = ReleasedDroppedPresettledCountTest(address, num_messages, large_message)
test.run()
outs = local_node.query(type='org.apache.qpid.dispatch.router')
results = outs.results[0]
self.assertEqual(results[deliveries_dropped_presettled_index] - deliveries_dropped_presettled_pre_test, 10)
self.assertEqual(results[deliveries_released_index] - deliveries_released_pre_test, 10)
self.assertEqual(results[deliveries_presettled_index] - deliveries_presettled_pre_test, 10)
# check link statistics
self.assertEqual(test.sender_stats['deliveryCount'], test.n_sent)
self.assertEqual(test.sender_stats['releasedCount'], 10)
self.assertEqual(test.sender_stats['presettledCount'], 10)
self.assertEqual(test.sender_stats['droppedPresettledCount'], 10)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'releasedCount',
'presettledCount',
'droppedPresettledCount'])))
def test_two_router_released_dropped_counts(self):
self.two_router_released_dropped_counts()
def test_two_router_large_message_released_dropped_counts(self):
self.two_router_released_dropped_counts(True)
class AddressCheckerTimeout ( object ):
def __init__(self, parent):
self.parent = parent
def on_timer_task(self, event):
self.parent.address_check_timeout()
class CounterCheckerTimeout ( object ):
def __init__(self, parent):
self.parent = parent
def on_timer_task(self, event):
self.parent.count_check_timeout()
class LargePresettledLinkCounterTest(MessagingHandler):
def __init__(self, sender_addr, receiver_addr):
super(LargePresettledLinkCounterTest, self).__init__()
self.timer = None
self.sender_conn = None
self.receiver_conn = None
self.receiver = None
self.error = None
self.n_sent = 0
self.n_received = 0
self.num_messages = 25
self.sender_addr = sender_addr
self.receiver_addr = receiver_addr
self.dest = "LargePresettledLinkCounterTest"
self.links = None
self.success = False
self.address_check_timer = None
self.container = None
self.num_attempts = 0
self.reactor = None
self.done = False
def check_if_done(self):
if self.done:
# Step 5: All messages have been received by receiver.
# Check the presettled count on the inter-router link of
# Router B (where the receiver is attached).
self.links = get_inter_router_links(self.receiver_addr)
for link in self.links:
# The self.num_messages + 1 is because before this test started the presettledCount was 1
if link.get("linkDir") == "in" and link.get("presettledCount") == self.num_messages + 1:
self.success = True
break
self.sender_conn.close()
self.receiver_conn.close()
self.timer.cancel()
def address_check_timeout(self):
if has_mobile_dest_in_address_table(self.sender_addr, self.dest):
# Step 3: The address has propagated to Router A. Now attach a sender
# to router A.
self.sender_conn = self.container.connect(self.sender_addr)
self.sender = self.container.create_sender(self.sender_conn,
self.dest,
name='SenderA')
else:
if self.num_attempts < 2:
self.address_check_timer = self.reactor.schedule(2,
AddressCheckerTimeout(self))
self.num_attempts += 1
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d, self.self.n_received=%d " % (self.n_sent, self.n_received)
self.sender_conn.close()
self.receiver_conn.close()
def on_start(self, event):
self.container = event.container
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
# Step 1: Create a receiver with name ReceiverA to address LargePresettledLinkCounterTest
# This receiver is attached to router B. Later a sender will be
# created which will be connected to Router A. The sender will send
# on the same address that the receiver is receiving on.
self.receiver_conn = event.container.connect(self.receiver_addr)
self.receiver = event.container.create_receiver(self.receiver_conn,
self.dest,
name='ReceiverA')
def on_link_opened(self, event):
self.reactor = event.reactor
if event.receiver:
# Step 2: The receiver link has been opened.
# Give 2 seconds for the address to propagate to the other router (Router A)
self.address_check_timer = event.reactor.schedule(2, AddressCheckerTimeout(self))
self.num_attempts += 1
def on_sendable(self, event):
# Step 4: Send self.num_messages multi-frame large pre-settled messages.
# These messages will travel over inter-router link to Router B.
if self.n_sent < self.num_messages:
msg = Message(body=LARGE_PAYLOAD)
dlv = self.sender.send(msg)
# We are sending a pre-settled large multi frame message.
dlv.settle()
self.n_sent += 1
def on_message(self, event):
if self.receiver == event.receiver:
self.n_received += 1
if self.n_received == self.num_messages:
self.done = True
self.check_if_done()
def run(self):
Container(self).run()
class LargePresettledReleasedLinkCounterTest(MessagingHandler):
def __init__(self, sender_addr, receiver_addr):
super(LargePresettledReleasedLinkCounterTest, self).__init__(prefetch=0)
self.sender_addr = sender_addr
self.receiver_addr = receiver_addr
self.dest = "LargePresettledReleasedLinkCounterTest"
self.receiver_dropoff_count = 50
self.num_messages = 200
self.num_attempts = 0
self.n_sent= 0
self.done = False
self.n_received = 0
self.count_check_timer = None
self.success = False
self.links = None
self.receiver_conn_closed = False
def check_if_done(self):
# Step 6:
# Check the counts on the inter-router link of
# Router B (where the receiver is attached). There
# should be no released or modified messages.
self.links = get_inter_router_links(self.receiver_addr)
for link in self.links:
# We don't know how many deliveries got from one side of the
# inter-router link to the other but there should at least be as
# many as was sent to the receiver
if link.get("linkDir") == "in" \
and link.get("presettledCount") > self.receiver_dropoff_count \
and link.get("deliveryCount") > self.receiver_dropoff_count \
and link.get("releasedCount") == 0\
and link.get("modifiedCount") == 0:
self.success = True
break
self.sender_conn.close()
self.timer.cancel()
def count_check_timeout(self):
self.check_if_done()
def address_check_timeout(self):
if has_mobile_dest_in_address_table(self.sender_addr, self.dest):
# Step 3: The address has propagated to Router A. Now attach a sender
# to router A.
self.sender_conn = self.container.connect(self.sender_addr)
self.sender = self.container.create_sender(self.sender_conn,
self.dest,
name='SenderA')
else:
if self.num_attempts < 2:
self.address_check_timer = self.reactor.schedule(2, AddressCheckerTimeout(self))
self.num_attempts += 1
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d, self.self.n_received=%d " % (self.n_sent, self.n_received)
self.sender_conn.close()
if not self.receiver_conn_closed:
self.receiver_conn.close()
def on_start(self, event):
self.container = event.container
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
# Step 1: Create a receiver with name ReceiverA to address LargePresettledReleasedLinkCounterTest
# This receiver is attached to router B. Later a sender will be
# created which will be connected to Router A. The sender will send
# on the same address that the receiver is receiving on.
self.receiver_conn = event.container.connect(self.receiver_addr)
self.receiver = event.container.create_receiver(self.receiver_conn,
self.dest,
name='ReceiverA')
self.receiver.flow(self.receiver_dropoff_count)
def on_link_opened(self, event):
self.reactor = event.reactor
if event.receiver:
# Step 2: The receiver link has been opened.
# Give 2 seconds for the address to propagate to the other router (Router A)
self.address_check_timer = event.reactor.schedule(2, AddressCheckerTimeout(self))
self.num_attempts += 1
def on_sendable(self, event):
# Step 4: Send self.num_messages multi-frame large pre-settled messages.
# These messages will travel over inter-router link to Router B.
while self.n_sent < self.num_messages:
msg = Message(body=LARGE_PAYLOAD)
dlv = self.sender.send(msg)
# We are sending a pre-settled large multi frame message.
dlv.settle()
self.n_sent += 1
def on_message(self, event):
if self.receiver == event.receiver and not self.done:
self.n_received += 1
# Step 5: The receiver receives only 50 messages out of the 200
# messages and drops out.
if self.n_received == self.receiver_dropoff_count:
self.done = True
self.receiver_conn.close()
self.receiver_conn_closed = True
self.count_check_timer = event.reactor.schedule(3, CounterCheckerTimeout(self))
def run(self):
Container(self).run()
class TwoRouterLargeMessagePresettledCountTest(TestCase):
@classmethod
def setUpClass(cls):
super(TwoRouterLargeMessagePresettledCountTest, cls).setUpClass()
listen_port_1 = cls.tester.get_port()
listen_port_2 = cls.tester.get_port()
listen_port_inter_router = cls.tester.get_port()
config_1 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'A'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
])
config_2 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router}),
])
cls.routers = []
cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
cls.routers[1].wait_router_connected('A')
def test_verify_inter_router_presettled_count_DISPATCH_1540(self):
sender_address = self.routers[0].addresses[0]
receiver_address = self.routers[1].addresses[0]
# Sends presettled large messages across routers and checks
# the pre-settled count on the inter-router link of the downstream
# router (i.e. that to which receiver is attached)
# This test will fail if DISPATCH-1540 is not fixed since the
# pre-settled count will show zero
test = LargePresettledLinkCounterTest(sender_address, receiver_address)
test.run()
self.assertTrue(test.success)
class TwoRouterLargeMessagePresettledReleasedCountTest(TestCase):
@classmethod
def setUpClass(cls):
super(TwoRouterLargeMessagePresettledReleasedCountTest, cls).setUpClass()
listen_port_1 = cls.tester.get_port()
listen_port_2 = cls.tester.get_port()
listen_port_inter_router = cls.tester.get_port()
config_1 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'A'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
])
config_2 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router}),
])
cls.routers = []
cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
cls.routers[1].wait_router_connected('A')
def test_verify_inter_router_presettled_released_count_DISPATCH_1541(self):
# This test sends presettled large messages across routers. A sender is on
# router A and a receiver on B. The sender sends 200 messages, the receiver
# receives 50 messages and goes away by closing its connection. There should be no released or
# modified messages on the incoming inter-router link on Router B
# This test will fail without the patch to DISPATCH-1541
sender_address = self.routers[0].addresses[0]
receiver_address = self.routers[1].addresses[0]
test = LargePresettledReleasedLinkCounterTest(sender_address, receiver_address)
test.run()
self.assertTrue(test.success)
class LinkRouteIngressEgressTransitTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(LinkRouteIngressEgressTransitTest, cls).setUpClass()
def router(name, connection):
config = [
('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
] + connection
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
cls.routers = []
a_listener_port = cls.tester.get_port()
b_listener_port = cls.tester.get_port()
c_listener_port = cls.tester.get_port()
test_tag_listener_port = cls.tester.get_port()
router('A',
[
('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
])
router('B',
[
('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'name': 'test-tag', 'role': 'route-container', 'host': '0.0.0.0', 'port': test_tag_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'broker', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'routerC', 'role': 'inter-router', 'host': '0.0.0.0', 'port': c_listener_port}),
('linkRoute', {'prefix': 'pulp.task', 'connection': 'broker', 'direction': 'in'}),
('linkRoute', {'prefix': 'pulp.task', 'connection': 'broker', 'direction': 'out'}),
]
)
router('C',
[
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': c_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
('linkRoute', {'prefix': 'pulp.task', 'direction': 'in'}),
('linkRoute', {'prefix': 'pulp.task', 'direction': 'out'}),
]
)
# Wait for the routers to locate each other, and for route propagation
# to settle
cls.routers[2].wait_router_connected('QDR.B')
cls.routers[1].wait_router_connected('QDR.C')
cls.routers[2].wait_address("pulp.task", remotes=1, delay=3, count=2)
# This is not a classic router network in the sense that QDR.A and D are acting as brokers. We allow a little
# bit more time for the routers to stabilize.
sleep(2)
def link_route_ingress_egress_transit_counts(self, large_message=False):
address1 = self.routers[2].addresses[0]
address2 = self.routers[2].addresses[0]
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_transit_index = outs.attribute_names.index('deliveriesTransit')
results = outs.results[0]
pre_ingress_count = results[deliveries_ingress_index]
pre_egress_count = results[deliveries_egress_index]
pre_transit_count = results[deliveries_transit_index]
num_messages = 10
# Send and receive on the same router, router C
test = IngressEgressTransitLinkRouteTest(address1, address2, num_messages, large_message=large_message)
test.run()
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
results = outs.results[0]
post_ingress_count = results[deliveries_ingress_index]
post_egress_count = results[deliveries_egress_index]
post_transit_count = results[deliveries_transit_index]
# 10 messages entered the router, and 10 messages were echoed by router A and 3 mgmt requests
self.assertEqual(post_ingress_count - pre_ingress_count, 23)
# 10 messages + 3 mgmt request
self.assertEqual(post_egress_count - pre_egress_count, 13)
# 10 messages went out this router
self.assertEqual(post_transit_count - pre_transit_count, 10)
# Check link statistics
self.assertEqual(test.sender_stats['deliveryCount'], 10)
self.assertEqual(test.sender_stats['acceptedCount'], 10)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
self.assertEqual(test.receiver_stats['deliveryCount'], 10)
self.assertEqual(test.receiver_stats['acceptedCount'], 10)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
def test_link_route_ingress_egress_transit_counts(self):
self.link_route_ingress_egress_transit_counts()
def test_link_route_large_message_ingress_egress_transit_counts(self):
self.link_route_ingress_egress_transit_counts(True)
class TwoRouterIngressEgressTest(TestCase):
@classmethod
def setUpClass(cls):
super(TwoRouterIngressEgressTest, cls).setUpClass()
listen_port_1 = cls.tester.get_port()
listen_port_2 = cls.tester.get_port()
listen_port_inter_router = cls.tester.get_port()
config_1 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'A'}),
('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
])
config_2 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router}),
])
cls.routers = []
cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
cls.routers[1].wait_router_connected('A')
def two_router_ingress_egress_counts(self, large_message=False):
in_router_addr = self.routers[0].addresses[0]
out_router_addr = self.routers[1].addresses[0]
# Gather the values for deliveries_ingress and deliveries_egress before running the test.
local_node = Node.connect(in_router_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
results = outs.results[0]
pre_deliveries_ingresss = results[deliveries_ingress_index]
local_node = Node.connect(out_router_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_accepted_index = outs.attribute_names.index('acceptedDeliveries')
results = outs.results[0]
pre_deliveries_egress = results[deliveries_egress_index]
pre_deliveries_accepted = results[deliveries_accepted_index]
# Now run the test. At the end of the test each router will be queried
# for the per-link stats
num_messages = 10
test = IngressEgressTwoRouterTest(in_router_addr, out_router_addr, num_messages, large_message=large_message)
test.run()
# Gather the values for deliveries_ingress and deliveries_egress after running the test.
local_node = Node.connect(in_router_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
results = outs.results[0]
post_deliveries_ingresss = results[deliveries_ingress_index]
local_node = Node.connect(out_router_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
results = outs.results[0]
post_deliveries_egress = results[deliveries_egress_index]
post_deliveries_accepted = results[deliveries_accepted_index]
accepted_deliveries_diff = post_deliveries_accepted - pre_deliveries_accepted
# 12 = 10 msgs + 2 mgmt requests
self.assertEqual(post_deliveries_ingresss - pre_deliveries_ingresss, 12)
self.assertEqual(post_deliveries_egress - pre_deliveries_egress, 12)
# check the link statistics
self.assertEqual(test.sender_stats['deliveryCount'], num_messages)
self.assertEqual(test.sender_stats['acceptedCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
self.assertEqual(test.receiver_stats['deliveryCount'], num_messages)
self.assertEqual(test.receiver_stats['acceptedCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
# The management requests are counted in the acceptedDeliveries, so it is difficult to measure the
# exact number of accepted deliveries at this point in time. But it must at least be 10 since
# we know for sure from the test that the 10 dispositions related to the 10 sent messages
# were definitely received
self.assertTrue(accepted_deliveries_diff >= num_messages)
def test_two_router_ingress_egress_counts(self):
self.two_router_ingress_egress_counts()
def test_two_router_large_message_ingress_egress_counts(self):
self.two_router_ingress_egress_counts(True)
class OneRouterIngressEgressTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(OneRouterIngressEgressTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def one_router_ingress_egress_counts(self, large_message=False):
address = self.router.addresses[0]
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
results = outs.results[0]
deliveries_ingress_pre_test = results[deliveries_ingress_index]
deliveries_egress_pre_test = results[deliveries_egress_index]
num_messages = 10
test = IngressEgressOneRouterTest(address, num_messages, large_message=large_message)
test.run()
outs = local_node.query(type='org.apache.qpid.dispatch.router')
results = outs.results[0]
# 13 = ten msgs + 3 mgmt requests
self.assertEqual(results[deliveries_ingress_index] - deliveries_ingress_pre_test, 13)
# 12 = ten msgs + 2 mgmt requests
self.assertEqual(results[deliveries_egress_index] - deliveries_egress_pre_test, 13)
# check the link statistics
self.assertEqual(test.sender_stats['deliveryCount'], num_messages)
self.assertEqual(test.sender_stats['acceptedCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
self.assertEqual(test.receiver_stats['deliveryCount'], num_messages)
self.assertEqual(test.receiver_stats['acceptedCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
def test_one_router_ingress_egress_counts(self):
self.one_router_ingress_egress_counts()
def test_one_router_large_message_ingress_egress_counts(self):
self.one_router_ingress_egress_counts(True)
class RouteContainerEgressCount(TestCase):
@classmethod
def setUpClass(cls):
super(RouteContainerEgressCount, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
#
# Create a general-purpose listener for sending and receiving deliveries
#
('listener', {'port': cls.tester.get_port()}),
# Create a route-container listener and give it a name myListener.
# Later on we will create an autoLink which has a connection property of myListener.
#
('listener', {'role': 'route-container', 'name': 'myListener', 'port': cls.tester.get_port()}),
#
# Note here that the connection is set to a previously declared 'myListener'
#
('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'in'}),
('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'out'}),
])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def route_container_egress(self , large_message=False):
regular_addr = self.router.addresses[0]
route_container_addr = self.router.addresses[1]
num_messages = 10
local_node = Node.connect(regular_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_egress_route_container_index = outs.attribute_names.index('deliveriesEgressRouteContainer')
results = outs.results[0]
deliveries_egress_pre_test = results[deliveries_egress_route_container_index]
test = RouteContainerEgressTest(route_container_addr, regular_addr, num_messages, large_message=large_message)
test.run()
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_egress_route_container_index = outs.attribute_names.index('deliveriesEgressRouteContainer')
results = outs.results[0]
# 11 = 10 msgs + 1 mgmt msg
self.assertEqual(results[deliveries_egress_route_container_index] - deliveries_egress_pre_test, 11)
# check link statistics
self.assertEqual(test.sender_stats['deliveryCount'], num_messages)
self.assertEqual(test.sender_stats['acceptedCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
self.assertEqual(test.receiver_stats['deliveryCount'], num_messages)
self.assertEqual(test.receiver_stats['acceptedCount'], num_messages)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
def test_route_container_egress_count(self):
self.route_container_egress()
def test_route_container_large_message_egress_count(self):
self.route_container_egress(True)
class OneRouterLinkCountersTest(TestCase):
"""
A set of tests that validate link-level counters
"""
CREDIT = 20 # default issued by test receiver client
COUNT = 40 # default total msgs the sender client generates
@classmethod
def setUpClass(cls):
# create one router
super(OneRouterLinkCountersTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'LinkCounters'}),
('listener', {'port': listen_port,
'authenticatePeer': False,
'saslMechanisms': 'ANONYMOUS',
'linkCapacity': cls.CREDIT})])
cls.router = cls.tester.qdrouterd(name="LinkCounters", config=config, wait=True)
class LinkCountersTest(MessagingHandler):
"""
Create 1 sender and 1 receiver to router_addr. Send count messages.
The test ends when the receivers deliveryCount reaches rx_limit.
Explicitly set the receiver credit and whether to sender sends
presettled or unsettled messages.
"""
def __init__(self, router_addr, count=None, rx_limit=None,
credit=None, presettled=False, outcome=None,
large_message=False):
super(OneRouterLinkCountersTest.LinkCountersTest,
self).__init__(auto_accept=False,
auto_settle=False,
prefetch=0)
self.router_addr = router_addr
self.presettled = presettled
self.outcome = outcome
self.count = OneRouterLinkCountersTest.COUNT \
if count is None else count
self.credit = OneRouterLinkCountersTest.COUNT \
if credit is None else credit
self.rx_limit = OneRouterLinkCountersTest.COUNT \
if rx_limit is None else rx_limit
self.sent = 0
self.timer = 0
self.poll_timer = None
self.conn = None
self.sender_stats = None
self.receiver_stats = None
self.large_message = large_message
def timeout(self):
self._cleanup()
def _cleanup(self):
if self.conn:
self.conn.close()
self.conn = None
if self.poll_timer:
self.poll_timer.cancel()
self.poll_timer = None
if self.timer:
self.timer.cancel()
self.timer = None
def poll_timeout(self):
"""
Periodically check the deliveryCount on the receiver. Once it
reaches rx_limit the test is complete: gather link statistics
before closing the clients
"""
li = get_link_info("Rx_Test01", self.router_addr)
if li and li['deliveryCount'] == self.rx_limit:
self.receiver_stats = li
self.sender_stats = get_link_info("Tx_Test01", self.router_addr)
self._cleanup()
else:
self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self))
def on_start(self, event):
self.reactor = event.reactor
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self))
self.conn = event.container.connect(self.router_addr)
self.receiver = event.container.create_receiver(self.conn,
source="Test01",
name='Rx_Test01')
self.receiver.flow(self.credit)
self.sender = event.container.create_sender(self.conn,
target="Test01",
name="Tx_Test01")
def on_sendable(self, event):
if self.sent < self.count:
if self.large_message:
dlv = self.sender.send(Message(body=LARGE_PAYLOAD))
else:
dlv = self.sender.send(Message(body="Test01"))
if self.presettled:
dlv.settle()
self.sent += 1
def on_message(self, event):
if self.outcome:
event.delivery.update(self.outcome)
event.delivery.settle()
# otherwise just drop it
def run(self):
Container(self).run()
def verify_released(self, large_message=False):
"""
Verify the link released count by releasing all received messages
"""
test = self.LinkCountersTest(self.router.addresses[0],
outcome=Delivery.RELEASED,
large_message=large_message)
test.run()
self.assertEqual(test.receiver_stats['deliveryCount'], self.COUNT)
self.assertEqual(test.receiver_stats['releasedCount'], self.COUNT)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS
- set(['deliveryCount',
'releasedCount'])))
self.assertEqual(test.sender_stats['deliveryCount'], self.COUNT)
self.assertEqual(test.sender_stats['releasedCount'], self.COUNT)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS
- set(['deliveryCount',
'releasedCount'])))
def verify_unsettled_count(self, large_message=False):
"""
Verify the link unsettled count by granting less credit than required
by the sender
"""
test = self.LinkCountersTest(self.router.addresses[0],
presettled=False,
count=self.COUNT,
rx_limit=self.CREDIT,
credit=self.CREDIT,
large_message=large_message)
test.run()
# expect the receiver to get rx_limit worth of unsettled deliveries
self.assertEqual(test.receiver_stats['deliveryCount'], self.CREDIT)
self.assertEqual(test.receiver_stats['unsettledCount'], self.CREDIT)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS
- set(['deliveryCount',
'unsettledCount'])))
# expect sender only to be able to send as much as credit
self.assertEqual(test.sender_stats['deliveryCount'], self.CREDIT)
self.assertEqual(test.sender_stats['unsettledCount'], self.CREDIT)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS
- set(['deliveryCount',
'unsettledCount'])))
def verify_presettled_count(self, large_message=False):
"""
Verify the presettled dropped count link counter by exhausting credit
before sending is complete
"""
limit = self.CREDIT//2 # 1/2 the capacity given the sender
test = self.LinkCountersTest(self.router.addresses[0],
presettled=True,
count=self.COUNT,
rx_limit=limit,
credit=limit,
large_message=large_message)
test.run()
# since these are presettled the sender should have credit
# replenished by the router after each message.
self.assertEqual(test.sender_stats['deliveryCount'], self.COUNT)
self.assertEqual(test.sender_stats['presettledCount'], self.COUNT)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS
- set(['deliveryCount',
'presettledCount'])))
# since credit is fixed at limit, exactly that number of msgs can be received
self.assertEqual(test.receiver_stats['deliveryCount'], limit)
# verify that some messages were dropped and some are stuck on the
# undelivered list
self.assertTrue(test.receiver_stats['undeliveredCount'] > 0)
self.assertTrue(test.receiver_stats['droppedPresettledCount'] > 0)
# expect that whatever was not dropped was delivered
self.assertEqual(test.receiver_stats['deliveryCount'],
(test.receiver_stats['presettledCount']
- test.receiver_stats['droppedPresettledCount']))
# expect the sum of dropped+delivered+undelivered accounts for all
# messages sent
self.assertEqual(self.COUNT,
(test.receiver_stats['deliveryCount']
+ test.receiver_stats['undeliveredCount']
+ test.receiver_stats['droppedPresettledCount']))
# all other counters must be zero
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS
- set(['deliveryCount',
'undeliveredCount',
'droppedPresettledCount',
'presettledCount'])))
def verify_one_credit_accepted(self, large_message=False):
"""
Verify counters on a credit-blocked link
"""
test = self.LinkCountersTest(self.router.addresses[0],
outcome=Delivery.ACCEPTED,
rx_limit=1,
credit=1)
test.run()
# expect only 1 delivery, an link credit worth of queued up messages
self.assertEqual(test.receiver_stats['deliveryCount'], 1)
self.assertEqual(test.receiver_stats['acceptedCount'], 1)
self.assertEqual(test.receiver_stats['undeliveredCount'], self.CREDIT)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS
- set(['deliveryCount',
'undeliveredCount',
'acceptedCount'])))
# expect that one message will be delivered, then link capacity
# messages will be enqueued internally
self.assertEqual(test.sender_stats['unsettledCount'], self.CREDIT)
self.assertEqual(test.sender_stats['deliveryCount'], self.CREDIT + 1)
self.assertEqual(test.sender_stats['acceptedCount'], 1)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS
- set(['deliveryCount',
'unsettledCount',
'acceptedCount'])))
def test_01_presettled(self):
self.verify_presettled_count()
def test_02_large_mesage_presettled(self):
self.verify_presettled_count(True)
def test_03_unsettled(self):
self.verify_presettled_count()
def test_04_large_message_unsettled(self):
self.verify_presettled_count(True)
def test_05_released(self):
self.verify_released()
def test_06_large_message_released(self):
self.verify_released(True)
def test_07_one_credit_accepted(self):
self.verify_one_credit_accepted()
def test_08_large_message_one_credit_accepted(self):
self.verify_one_credit_accepted(True)
class RouteContainerIngressCount(TestCase):
@classmethod
def setUpClass(cls):
super(RouteContainerIngressCount, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
#
# Create a general-purpose listener for sending and receiving deliveries
#
('listener', {'port': cls.tester.get_port()}),
# Create a route-container listener and give it a name myListener.
# Later on we will create an autoLink which has a connection property of myListener.
#
('listener', {'role': 'route-container', 'name': 'myListener', 'port': cls.tester.get_port()}),
#
# Note here that the connection is set to a previously declared 'myListener'
#
('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'in'}),
('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'out'}),
])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def test_route_container_ingress(self):
regular_addr = self.router.addresses[0]
route_container_addr = self.router.addresses[1]
test = RouteContainerIngressTest(route_container_addr, regular_addr)
test.run()
local_node = Node.connect(regular_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_route_container_index = outs.attribute_names.index('deliveriesIngressRouteContainer')
results = outs.results[0]
# 22 = 20 msgs + 2 mgmt msgs
self.assertEqual(results[deliveries_ingress_route_container_index], 22)
# check link statistics
self.assertEqual(test.sender_stats['deliveryCount'], 10)
self.assertEqual(test.sender_stats['acceptedCount'], 10)
self.assertTrue(_link_stats_are_zero(test.sender_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
self.assertEqual(test.sender1_stats['deliveryCount'], 10)
self.assertEqual(test.sender1_stats['acceptedCount'], 10)
self.assertTrue(_link_stats_are_zero(test.sender1_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
self.assertEqual(test.receiver_stats['deliveryCount'], 20)
self.assertEqual(test.receiver_stats['acceptedCount'], 20)
self.assertTrue(_link_stats_are_zero(test.receiver_stats,
_LINK_STATISTIC_KEYS - set(['deliveryCount',
'acceptedCount'])))
class IngressEgressTwoRouterTest(MessagingHandler):
def __init__(self, sender_address, receiver_address, num_messages, large_message=False):
super(IngressEgressTwoRouterTest, self).__init__()
self.sender = None
self.receiver = None
self.conn_sender = None
self.conn_recv = None
self.timer = None
self.dest = 'examples'
self.sender_address = sender_address
self.receiver_address = receiver_address
self.n_sent = 0
self.n_received = 0
self.num_messages = num_messages
self.start = False
self.n_accept = 0
self.sender_stats = None
self.receiver_stats = None
self.done = False
self.large_message = large_message
def timeout(self):
self.conn_sender.close()
self.conn_recv.close()
def check_if_done(self):
if not self.done and self.num_messages == self.n_received and self.n_accept == self.num_messages:
self.done = True
self.sender_stats = get_link_info('Tx_IngressEgressTwoRouterTest',
self.sender_address)
self.receiver_stats = get_link_info('Rx_IngressEgressTwoRouterTest',
self.receiver_address)
self.conn_sender.close()
self.conn_recv.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn_recv = event.container.connect(self.receiver_address)
self.receiver = event.container.create_receiver(self.conn_recv,
source=self.dest,
name='Rx_IngressEgressTwoRouterTest')
def on_sendable(self, event):
if not self.start:
return
if self.n_sent < self.num_messages:
msg = Message(body=get_body(self.n_sent, self.large_message))
self.sender.send(msg)
self.n_sent += 1
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.start = True
self.conn_sender = event.container.connect(self.sender_address)
self.sender = event.container.create_sender(self.conn_sender,
target=self.dest,
name='Tx_IngressEgressTwoRouterTest')
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
if event.sender:
self.n_accept += 1
self.check_if_done()
def run(self):
Container(self).run()
class IngressEgressOneRouterTest(MessagingHandler):
def __init__(self, address, num_messages, large_message=False):
super(IngressEgressOneRouterTest, self).__init__()
self.sender = None
self.receiver = None
self.conn = None
self.timer = None
self.dest = 'examples'
self.address = address
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
self.num_messages = num_messages
self.sender_stats = None
self.receiver_stats = None
self.done = False
self.large_message = large_message
def timeout(self):
self.conn.close()
def check_if_done(self):
if not self.done and (self.n_sent == self.n_received
and self.n_sent == self.n_accepted):
self.done = True
self.sender_stats = get_link_info('Tx_IngressEgressOneRouterTest', self.address)
self.receiver_stats = get_link_info('Rx_IngressEgressOneRouterTest', self.address)
self.conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn,
target=self.dest,
name='Tx_IngressEgressOneRouterTest')
self.receiver = event.container.create_receiver(self.conn,
source=self.dest,
name='Rx_IngressEgressOneRouterTest')
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body=get_body(self.n_sent, self.large_message))
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
self.n_accepted += 1
self.check_if_done()
def run(self):
Container(self).run()
class RouteContainerEgressTest(MessagingHandler):
def __init__(self, route_container_addr, sender_addr, num_messages, large_message=False):
super(RouteContainerEgressTest, self).__init__()
self.sender_addr = sender_addr
self.route_container_addr = route_container_addr
self.timer = None
self.error = None
self.receiver = None
self.receiver_conn = None
self.dest = "myListener.1"
self.sender_conn = None
self.sender = None
self.start = False
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
self.num_messages = num_messages
self.sender_stats = None
self.receiver_stats = None
self.done = False
def check_if_done(self):
if not self.done and (self.n_sent == self.n_received
and self.n_sent == self.n_accepted):
self.done = True
self.sender_stats = get_link_info('Tx_RouteContainerEgressTest', self.sender_addr)
self.receiver_stats = get_link_info('Rx_RouteContainerEgressTest', self.route_container_addr)
self.receiver_conn.close()
self.sender_conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.receiver_conn = event.container.connect(self.route_container_addr)
self.receiver = event.container.create_receiver(self.receiver_conn,
source=self.dest,
name='Rx_RouteContainerEgressTest')
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received)
self.sender_conn.close()
self.receiver_conn.close()
def on_sendable(self, event):
if not self.start:
return
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
self.n_accepted += 1
self.check_if_done()
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.start = True
self.sender_conn = event.container.connect(self.sender_addr)
self.sender = event.container.create_sender(self.sender_conn,
target=self.dest,
name='Tx_RouteContainerEgressTest')
def run(self):
Container(self).run()
class RouteContainerIngressTest(MessagingHandler):
def __init__(self, route_container_addr, receiver_addr):
super(RouteContainerIngressTest, self).__init__()
self.receiver_addr = receiver_addr
self.route_container_addr = route_container_addr
self.timer = None
self.error = None
self.receiver = None
self.receiver_conn = None
self.dest = "myListener.1"
self.sender_conn = None
self.sender = None
self.sender1 = None
self.start = False
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
self.num_messages = 20
self.sender_stats = None
self.sender1_stats = None
self.receiver_stats = None
self.done = False
def check_if_done(self):
if not self.done and (self.n_sent == self.n_received
and self.n_sent == self.n_accepted):
self.done = True
self.sender_stats = get_link_info('A', self.route_container_addr)
self.sender1_stats = get_link_info('B', self.route_container_addr)
self.receiver_stats = get_link_info('Rx_RouteContainerIngressTest', self.receiver_addr)
self.receiver_conn.close()
self.sender_conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.receiver_conn = event.container.connect(self.receiver_addr)
self.receiver = event.container.create_receiver(self.receiver_conn,
source=self.dest,
name='Rx_RouteContainerIngressTest')
self.sender_conn = event.container.connect(self.route_container_addr)
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received)
self.sender_conn.close()
self.receiver_conn.close()
def on_sendable(self, event):
if not self.start:
return
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
self.sender1.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
self.n_accepted += 1
self.check_if_done()
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.start = True
# Create 2 senders. Each sender will send 10 messages each
self.sender = event.container.create_sender(self.sender_conn, self.dest, name="A")
self.sender1 = event.container.create_sender(self.sender_conn, self.dest, name="B")
def run(self):
Container(self).run()
class IngressEgressTransitLinkRouteTest(MessagingHandler):
def __init__(self, sender_addr, receiver_addr, num_messages, large_message=False):
super(IngressEgressTransitLinkRouteTest, self).__init__()
self.timer = None
self.receiver_conn = None
self.receiver = None
self.sender = None
self.sender_conn = None
self.dest = "pulp.task"
self.start = False
self.n_sent = 0
self.num_messages = num_messages
self.n_received = 0
self.n_accepted = 0
self.sender_addr = sender_addr
self.receiver_addr = receiver_addr
self.error = None
self.sender_stats = None
self.receiver_stats = None
self.done = False
self.large_message = large_message
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received)
self.sender_conn.close()
self.receiver_conn.close()
def check_if_done(self):
if not self.done and (self.n_sent == self.n_received
and self.n_sent == self.n_accepted):
self.done = True
self.sender_stats = get_link_info('Tx_IngressEgressTransitLinkRouteTest',
self.sender_addr)
self.receiver_stats = get_link_info('Rx_IngressEgressTransitLinkRouteTest',
self.receiver_addr)
self.receiver_conn.close()
self.sender_conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.receiver_conn = event.container.connect(self.receiver_addr)
self.receiver = event.container.create_receiver(self.receiver_conn,
source=self.dest,
name='Rx_IngressEgressTransitLinkRouteTest')
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.sender_conn = event.container.connect(self.sender_addr)
self.sender = event.container.create_sender(self.sender_conn,
target=self.dest,
name='Tx_IngressEgressTransitLinkRouteTest')
self.start = True
def on_sendable(self, event):
if not self.start:
return
if self.n_sent < self.num_messages:
msg = Message(body=get_body(self.n_sent, self.large_message))
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
self.n_accepted += 1
self.check_if_done()
def run(self):
Container(self).run()
class ReleasedDroppedPresettledCountTest(MessagingHandler):
def __init__(self, sender_addr, num_messages, large_message=False):
super(ReleasedDroppedPresettledCountTest, self).__init__()
self.timer = None
self.sender_conn = None
self.sender = None
self.error = None
self.n_sent = 0
self.num_messages = num_messages
self.sender_addr = sender_addr
self.sender_stats = None
# We are sending to a multicast address
self.dest = "multicast"
self.n_released = 0
self.expect_released = 10
self.done = False
self.large_message = large_message
def check_if_done(self):
if not self.done and self.expect_released == self.n_released:
self.done = True
self.sender_stats = get_link_info('ReleasedDroppedPresettledCountTest',
self.sender_addr)
self.sender_conn.close()
self.timer.cancel()
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d, self.self.n_released=%d " % (self.n_sent, self.n_released)
self.sender_conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.sender_conn = event.container.connect(self.sender_addr)
# Note that this is an anonymous link which will be granted credit w/o
# blocking for consumers. Therefore all messages sent to this address
# will be dropped
self.sender = event.container.create_sender(self.sender_conn,
name='ReleasedDroppedPresettledCountTest')
def on_sendable(self, event):
# We are sending a total of 20 deliveries. 10 unsettled and 10 pre-settled to a multicast address
if self.n_sent < self.num_messages:
msg = Message(body=get_body(self.n_sent, self.large_message))
msg.address = self.dest
dlv = self.sender.send(msg)
if self.n_sent < 10:
dlv.settle()
self.n_sent += 1
def on_released(self, event):
self.n_released += 1
self.check_if_done()
def run(self):
Container(self).run()
class RejectedDeliveriesTest(MessagingHandler):
def __init__(self, addr, num_messages, large_message=False):
super(RejectedDeliveriesTest, self).__init__(auto_accept=False)
self.addr = addr
self.dest = "someaddress"
self.error = None
self.n_sent = 0
self.num_messages = num_messages
self.n_rejected = 0
self.sender_conn = None
self.receiver_conn = None
self.timer = None
self.sender = None
self.receiver = None
self.sender_stats = None
self.receiver_stats = None
self.done = False
self.large_message = large_message
def check_if_done(self):
if not self.done and self.n_rejected == self.num_messages:
self.done = True
self.sender_stats = get_link_info('Tx_RejectedDeliveriesTest', self.addr)
self.receiver_stats = get_link_info('Rx_RejectedDeliveriesTest', self.addr)
self.sender_conn.close()
self.receiver_conn.close()
self.timer.cancel()
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d, self.self.n_rejected=%d " % (self.n_sent, self.n_rejected)
self.sender_conn.close()
self.receiver_conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.sender_conn = event.container.connect(self.addr)
self.sender = event.container.create_sender(self.sender_conn,
target=self.dest,
name='Tx_RejectedDeliveriesTest')
self.receiver_conn = event.container.connect(self.addr)
self.receiver = event.container.create_receiver(self.receiver_conn,
source=self.dest,
name='Rx_RejectedDeliveriesTest')
def on_rejected(self, event):
self.n_rejected += 1
self.check_if_done()
def on_message(self, event):
# We will reject every delivery we receive.
self.reject(event.delivery)
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body=get_body(self.n_sent, self.large_message))
self.sender.send(msg)
self.n_sent += 1
def run(self):
Container(self).run()
class ModifiedDeliveriesTest(MessagingHandler):
def __init__(self, addr, num_messages, large_message=False):
super(ModifiedDeliveriesTest, self).__init__(auto_accept=False)
self.addr = addr
self.dest = "someaddress"
self.error = None
self.n_sent = 0
self.num_messages = num_messages
self.n_modified = 0
self.sender_conn = None
self.receiver_conn = None
self.timer = None
self.sender = None
self.receiver = None
self.n_received = 0
self.sender_stats = None
self.receiver_stats = None
self.done = False
self.large_message = large_message
def check_if_done(self):
if not self.done and self.n_modified == self.num_messages:
self.done = True
self.sender_stats = get_link_info('Tx_ModifiedDeliveriesTest', self.addr)
self.sender_conn.close()
self.receiver_conn.close()
self.timer.cancel()
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d, self.self.n_modified=%d " % (self.n_sent, self.n_modified)
self.sender_conn.close()
self.receiver_conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.sender_conn = event.container.connect(self.addr)
self.sender = event.container.create_sender(self.sender_conn,
target=self.dest,
name='Tx_ModifiedDeliveriesTest')
self.receiver_conn = event.container.connect(self.addr)
self.receiver = event.container.create_receiver(self.receiver_conn,
source=self.dest,
name='Rx_ModifiedDeliveriesTest')
def on_released(self, event):
if event.delivery.remote_state == Delivery.MODIFIED:
self.n_modified += 1
self.check_if_done()
def on_message(self, event):
# The messages have arrived at the receiver but we will not settle the message and instead just closed the
# connection. Since the router did not receive the acknowledgements, it will send back MODIFIED dispositions
# to the sender.
self.n_received += 1
# After 10 messages are received, simply close the receiver connection without acknowledging the messages
if self.n_received == self.num_messages:
self.receiver_stats = get_link_info('Rx_ModifiedDeliveriesTest', self.addr)
self.receiver_conn.close()
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body=get_body(self.n_sent, self.large_message))
self.sender.send(msg)
self.n_sent += 1
def run(self):
Container(self).run()