blob: ff6475a75aacd78249eec5c1a0ec1d01387ccc78 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from time import sleep
from proton import Condition, Message, Delivery, Timeout
from system_test import TestCase, Qdrouterd, TIMEOUT
from proton.handlers import MessagingHandler
from proton.reactor import Container
from qpid_dispatch.management.client import Node
class OneRouterModifiedTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(OneRouterModifiedTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def test_one_router_modified_counts(self):
address = self.router.addresses[0]
test = ModifieddDeliveriesTest(address)
test.run()
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_modified_index = outs.attribute_names.index('modifiedDeliveries')
results = outs.results[0]
self.assertEqual(results[deliveries_modified_index], 10)
class OneRouterRejectedTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(OneRouterRejectedTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def test_one_router_rejected_counts(self):
address = self.router.addresses[0]
test = RejectedDeliveriesTest(address)
test.run()
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_rejected_index = outs.attribute_names.index('rejectedDeliveries')
results = outs.results[0]
self.assertEqual(results[deliveries_rejected_index], 10)
class OneRouterReleasedDroppedPresettledTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(OneRouterReleasedDroppedPresettledTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def test_one_router_released_dropped_counts(self):
address = self.router.addresses[0]
test = ReleasedDroppedPresettledCountTest(address)
test.run()
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_dropped_presettled_index = outs.attribute_names.index('droppedPresettledDeliveries')
deliveries_released_index = outs.attribute_names.index('releasedDeliveries')
deliveries_presettled_index = outs.attribute_names.index('presettledDeliveries')
results = outs.results[0]
self.assertEqual(results[deliveries_dropped_presettled_index], 10)
self.assertEqual(results[deliveries_released_index], 10)
self.assertEqual(results[deliveries_presettled_index], 10)
class TwoRouterReleasedDroppedPresettledTest(TestCase):
@classmethod
def setUpClass(cls):
super(TwoRouterReleasedDroppedPresettledTest, cls).setUpClass()
listen_port_1 = cls.tester.get_port()
listen_port_2 = cls.tester.get_port()
listen_port_inter_router = cls.tester.get_port()
config_1 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'A'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
])
config_2 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router,
'verifyHostname': 'no'}),
])
cls.routers = []
cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
cls.routers[1].wait_router_connected('A')
def test_two_router_released_dropped_counts(self):
address = self.routers[0].addresses[0]
# Send presettled and settled messages to router 1.
# Make sure the hello messages (which are presettled dont show up in the counts
test = ReleasedDroppedPresettledCountTest(address)
test.run()
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_dropped_presettled_index = outs.attribute_names.index('droppedPresettledDeliveries')
deliveries_released_index = outs.attribute_names.index('releasedDeliveries')
deliveries_presettled_index = outs.attribute_names.index('presettledDeliveries')
results = outs.results[0]
self.assertEqual(results[deliveries_dropped_presettled_index], 10)
self.assertEqual(results[deliveries_released_index], 10)
self.assertEqual(results[deliveries_presettled_index], 10)
class LinkRouteIngressEgressTransitTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start three routers"""
super(LinkRouteIngressEgressTransitTest, cls).setUpClass()
def router(name, connection):
config = [
('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
] + connection
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
cls.routers = []
a_listener_port = cls.tester.get_port()
b_listener_port = cls.tester.get_port()
c_listener_port = cls.tester.get_port()
test_tag_listener_port = cls.tester.get_port()
router('A',
[
('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
])
router('B',
[
('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'name': 'test-tag', 'role': 'route-container', 'host': '0.0.0.0', 'port': test_tag_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'broker', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'routerC', 'role': 'inter-router', 'host': '0.0.0.0', 'port': c_listener_port}),
('linkRoute', {'prefix': 'pulp.task', 'connection': 'broker', 'direction': 'in'}),
('linkRoute', {'prefix': 'pulp.task', 'connection': 'broker', 'direction': 'out'}),
]
)
router('C',
[
('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': c_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
('linkRoute', {'prefix': 'pulp.task', 'direction': 'in'}),
('linkRoute', {'prefix': 'pulp.task', 'direction': 'out'}),
]
)
# Wait for the routers to locate each other, and for route propagation
# to settle
cls.routers[2].wait_router_connected('QDR.B')
cls.routers[1].wait_router_connected('QDR.C')
cls.routers[2].wait_address("pulp.task", remotes=1, delay=3)
# This is not a classic router network in the sense that QDR.A and D are acting as brokers. We allow a little
# bit more time for the routers to stabilize.
sleep(2)
def test_link_route_ingress_egress_transit_counts(self):
address1 = self.routers[2].addresses[0]
address2 = self.routers[2].addresses[0]
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_transit_index = outs.attribute_names.index('deliveriesTransit')
results = outs.results[0]
pre_ingress_count = results[deliveries_ingress_index]
pre_egress_count = results[deliveries_egress_index]
pre_transit_count = results[deliveries_transit_index]
# Send and receive on the same router, router C
test = IngressEgressTransitLinkRouteTest(address1, address2)
test.run()
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_transit_index = outs.attribute_names.index('deliveriesTransit')
results = outs.results[0]
post_ingress_count = results[deliveries_ingress_index]
post_egress_count = results[deliveries_egress_index]
post_transit_count = results[deliveries_transit_index]
# 10 messages entered the router, and 10 messages were echoed by router A and one mgmt request
self.assertEqual(post_ingress_count - pre_ingress_count, 21)
# 10 messages + 1 mgmt request
self.assertEqual(post_egress_count - pre_egress_count, 11)
# 10 messages went out this router
self.assertEqual(post_transit_count - pre_transit_count, 10)
class TwoRouterIngressEgressTest(TestCase):
@classmethod
def setUpClass(cls):
super(TwoRouterIngressEgressTest, cls).setUpClass()
listen_port_1 = cls.tester.get_port()
listen_port_2 = cls.tester.get_port()
listen_port_inter_router = cls.tester.get_port()
config_1 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'A'}),
('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
])
config_2 = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'B'}),
('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}),
('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router,
'verifyHostname': 'no'}),
])
cls.routers = []
cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
cls.routers[1].wait_router_connected('A')
def test_two_router_ingress_egress_counts(self):
address1 = self.routers[0].addresses[0]
address2 = self.routers[1].addresses[0]
# Gather the values for deliveries_ingress and deliveries_egress before running the test.
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
results = outs.results[0]
pre_deliveries_ingresss = results[deliveries_ingress_index]
local_node = Node.connect(address2, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_accepted_index = outs.attribute_names.index('acceptedDeliveries')
results = outs.results[0]
pre_deliveries_egress = results[deliveries_egress_index]
pre_deliveries_accepted = results[deliveries_accepted_index]
# Now run the test.
test = IngressEgressTwoRouterTest(address1, address2)
test.run()
# Gather the values for deliveries_ingress and deliveries_egress after running the test.
local_node = Node.connect(address1, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
results = outs.results[0]
post_deliveries_ingresss = results[deliveries_ingress_index]
local_node = Node.connect(address2, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
deliveries_accepted_index = outs.attribute_names.index('acceptedDeliveries')
results = outs.results[0]
post_deliveries_egress = results[deliveries_egress_index]
post_deliveries_accepted = results[deliveries_accepted_index]
accepted_deliveries_diff = post_deliveries_accepted - pre_deliveries_accepted
self.assertEqual(post_deliveries_ingresss - pre_deliveries_ingresss, 11)
self.assertEqual(post_deliveries_egress - pre_deliveries_egress, 11)
# The management requests are counted in the acceptedDeliveries, so it is difficult to measure the
# exact number of accepted deliveries at this point in time. But it must at least be 10 since
# we know for sure from the test that the 10 dispositions related to the 10 sent messages
# were definitely received
self.assertTrue(accepted_deliveries_diff >= 10)
class OneRouterIngressEgressTest(TestCase):
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(OneRouterIngressEgressTest, cls).setUpClass()
listen_port = cls.tester.get_port()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'A'}),
('listener', {'port': listen_port, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'})])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def test_one_router_ingress_egress_counts(self):
address = self.router.addresses[0]
test = IngressEgressOneRouterTest(address)
test.run()
local_node = Node.connect(address, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_index = outs.attribute_names.index('deliveriesIngress')
deliveries_egress_index = outs.attribute_names.index('deliveriesEgress')
results = outs.results[0]
self.assertEqual(results[deliveries_ingress_index], 11)
self.assertEqual(results[deliveries_egress_index], 10)
class RouteContainerEgressCount(TestCase):
@classmethod
def setUpClass(cls):
super(RouteContainerEgressCount, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
#
# Create a general-purpose listener for sending and receiving deliveries
#
('listener', {'port': cls.tester.get_port()}),
# Create a route-container listener and give it a name myListener.
# Later on we will create an autoLink which has a connection property of myListener.
#
('listener', {'role': 'route-container', 'name': 'myListener', 'port': cls.tester.get_port()}),
#
# Note here that the connection is set to a previously declared 'myListener'
#
('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'in'}),
('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'out'}),
])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def test_route_container_egress(self):
regular_addr = self.router.addresses[0]
route_container_addr = self.router.addresses[1]
test = RouteContainerEgressTest(route_container_addr, regular_addr)
test.run()
local_node = Node.connect(regular_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_egress_route_container_index = outs.attribute_names.index('deliveriesEgressRouteContainer')
results = outs.results[0]
self.assertEqual(results[deliveries_egress_route_container_index], 10)
class RouteContainerIngressCount(TestCase):
@classmethod
def setUpClass(cls):
super(RouteContainerIngressCount, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
#
# Create a general-purpose listener for sending and receiving deliveries
#
('listener', {'port': cls.tester.get_port()}),
# Create a route-container listener and give it a name myListener.
# Later on we will create an autoLink which has a connection property of myListener.
#
('listener', {'role': 'route-container', 'name': 'myListener', 'port': cls.tester.get_port()}),
#
# Note here that the connection is set to a previously declared 'myListener'
#
('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'in'}),
('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'out'}),
])
cls.router = cls.tester.qdrouterd(name="A", config=config, wait=True)
def test_route_container_ingress(self):
regular_addr = self.router.addresses[0]
route_container_addr = self.router.addresses[1]
test = RouteContainerIngressTest(route_container_addr, regular_addr)
test.run()
local_node = Node.connect(regular_addr, timeout=TIMEOUT)
outs = local_node.query(type='org.apache.qpid.dispatch.router')
deliveries_ingress_route_container_index = outs.attribute_names.index('deliveriesIngressRouteContainer')
results = outs.results[0]
self.assertEqual(results[deliveries_ingress_route_container_index], 20)
class IngressEgressTwoRouterTest(MessagingHandler):
def __init__(self, address1, address2):
super(IngressEgressTwoRouterTest, self).__init__()
self.sender = None
self.receiver = None
self.conn_sender = None
self.conn_recv = None
self.timer = None
self.dest = 'examples'
self.address1 = address1
self.address2 = address2
self.n_sent = 0
self.n_received = 0
self.num_messages = 10
self.start = False
self.n_accept = 0
def timeout(self):
self.conn_sender.close()
self.conn_recv.close()
def check_if_done(self):
if self.num_messages == self.n_received and self.n_accept == self.num_messages:
self.conn_sender.close()
self.conn_recv.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn_recv = event.container.connect(self.address2)
self.receiver = event.container.create_receiver(self.conn_recv, self.dest)
def on_sendable(self, event):
if not self.start:
return
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.start = True
self.conn_sender = event.container.connect(self.address1)
self.sender = event.container.create_sender(self.conn_sender,
self.dest)
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
if event.sender:
self.n_accept += 1
self.check_if_done()
def run(self):
Container(self).run()
class IngressEgressOneRouterTest(MessagingHandler):
def __init__(self, address):
super(IngressEgressOneRouterTest, self).__init__()
self.sender = None
self.receiver = None
self.conn = None
self.timer = None
self.dest = 'examples'
self.address = address
self.n_sent = 0
self.n_received = 0
self.num_messages = 10
def timeout(self):
self.conn.close()
def check_if_done(self):
if self.n_sent == self.n_received:
self.conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn, self.dest)
self.receiver = event.container.create_receiver(self.conn, self.dest)
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
self.check_if_done()
def run(self):
Container(self).run()
class RouteContainerEgressTest(MessagingHandler):
def __init__(self, route_container_addr, sender_addr):
super(RouteContainerEgressTest, self).__init__()
self.sender_addr = sender_addr
self.route_container_addr = route_container_addr
self.timer = None
self.error = None
self.receiver = None
self.receiver_conn = None
self.dest = "myListener.1"
self.sender_conn = None
self.sender = None
self.start = False
self.n_sent = 0
self.n_received = 0
self.num_messages = 10
def check_if_done(self):
if self.n_sent == self.n_received:
self.receiver_conn.close()
self.sender_conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.receiver_conn = event.container.connect(self.route_container_addr)
self.receiver = event.container.create_receiver(self.receiver_conn, self.dest)
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received)
self.sender_conn.close()
self.receiver_conn.close()
def on_sendable(self, event):
if not self.start:
return
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
self.check_if_done()
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.start = True
self.sender_conn = event.container.connect(self.sender_addr)
self.sender = event.container.create_sender(self.sender_conn, self.dest)
def run(self):
Container(self).run()
class RouteContainerIngressTest(MessagingHandler):
def __init__(self, route_container_addr, receiver_addr):
super(RouteContainerIngressTest, self).__init__()
self.receiver_addr = receiver_addr
self.route_container_addr = route_container_addr
self.timer = None
self.error = None
self.receiver = None
self.receiver_conn = None
self.dest = "myListener.1"
self.sender_conn = None
self.sender = None
self.sender1 = None
self.start = False
self.n_sent = 0
self.n_received = 0
self.num_messages = 20
def check_if_done(self):
if self.n_sent == self.n_received:
self.receiver_conn.close()
self.sender_conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.receiver_conn = event.container.connect(self.receiver_addr)
self.receiver = event.container.create_receiver(self.receiver_conn, self.dest)
self.sender_conn = event.container.connect(self.route_container_addr)
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received)
self.sender_conn.close()
self.receiver_conn.close()
def on_sendable(self, event):
if not self.start:
return
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
self.sender1.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
self.check_if_done()
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.start = True
# Create 2 senders. Each sender will send 10 messages each
self.sender = event.container.create_sender(self.sender_conn, self.dest, name="A")
self.sender1 = event.container.create_sender(self.sender_conn, self.dest, name="B")
def run(self):
Container(self).run()
class IngressEgressTransitLinkRouteTest(MessagingHandler):
def __init__(self, sender_addr, receiver_addr):
super(IngressEgressTransitLinkRouteTest, self).__init__()
self.timer = None
self.receiver_conn = None
self.receiver = None
self.sender = None
self.sender_conn = None
self.dest = "pulp.task"
self.start = False
self.n_sent = 0
self.num_messages = 10
self.n_received = 0
self.sender_addr = sender_addr
self.receiver_addr = receiver_addr
self.error = None
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d self.n_received=%d" % (self.n_sent, self.self.n_received)
self.sender_conn.close()
self.receiver_conn.close()
def check_if_done(self):
if self.n_sent == self.n_received:
self.receiver_conn.close()
self.sender_conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.receiver_conn = event.container.connect(self.receiver_addr)
self.receiver = event.container.create_receiver(self.receiver_conn, self.dest)
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.sender_conn = event.container.connect(self.sender_addr)
self.sender = event.container.create_sender(self.sender_conn,
self.dest)
self.start = True
def on_sendable(self, event):
if not self.start:
return
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def on_message(self, event):
if event.receiver == self.receiver:
self.n_received += 1
def on_accepted(self, event):
self.check_if_done()
def run(self):
Container(self).run()
class ReleasedDroppedPresettledCountTest(MessagingHandler):
def __init__(self, sender_addr):
super(ReleasedDroppedPresettledCountTest, self).__init__()
self.timer = None
self.sender_conn = None
self.sender = None
self.error = None
self.n_sent = 0
self.num_messages = 20
self.sender_addr = sender_addr
# We are sending to a multicast address
self.dest = "multicast"
self.n_released = 0
self.expect_released = 10
def check_if_done(self):
if self.expect_released == self.n_released:
self.sender_conn.close()
self.timer.cancel()
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d, self.self.n_released=%d " % (self.n_sent, self.n_released)
self.sender_conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.sender_conn = event.container.connect(self.sender_addr)
# Note that there are no receivers. All messages sent to this address will be dropped
self.sender = event.container.create_sender(self.sender_conn, self.dest)
def on_sendable(self, event):
# We are sending a total of 20 deliveries. 10 unsettled and 10 pre-settled to a multicast address
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
dlv = self.sender.send(msg)
if self.n_sent < 10:
dlv.settle()
self.n_sent += 1
def on_released(self, event):
self.n_released += 1
self.check_if_done()
def run(self):
Container(self).run()
class RejectedDeliveriesTest(MessagingHandler):
def __init__(self, addr):
super(RejectedDeliveriesTest, self).__init__(auto_accept=False)
self.addr = addr
self.dest = "someaddress"
self.error = None
self.n_sent = 0
self.num_messages = 10
self.n_rejected = 0
self.sender_conn = None
self.receiver_conn = None
self.timer = None
self.sender = None
self.receiver = None
def check_if_done(self):
if self.n_rejected == self.num_messages:
self.sender_conn.close()
self.receiver_conn.close()
self.timer.cancel()
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d, self.self.n_rejected=%d " % (self.n_sent, self.n_rejected)
self.sender_conn.close()
self.receiver_conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.sender_conn = event.container.connect(self.addr)
self.sender = event.container.create_sender(self.sender_conn, self.dest)
self.receiver_conn = event.container.connect(self.addr)
self.receiver = event.container.create_receiver(self.receiver_conn, self.dest)
def on_rejected(self, event):
self.n_rejected += 1
self.check_if_done()
def on_message(self, event):
# We will reject every delivery we receive.
self.reject(event.delivery)
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message( body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def run(self):
Container(self).run()
class ModifieddDeliveriesTest(MessagingHandler):
def __init__(self, addr):
super(ModifieddDeliveriesTest, self).__init__(auto_accept=False)
self.addr = addr
self.dest = "someaddress"
self.error = None
self.n_sent = 0
self.num_messages = 10
self.n_modified = 0
self.sender_conn = None
self.receiver_conn = None
self.timer = None
self.sender = None
self.receiver = None
self.n_received = 0
def check_if_done(self):
if self.n_modified == self.num_messages:
self.sender_conn.close()
self.receiver_conn.close()
self.timer.cancel()
def timeout(self):
self.error = "Timeout Expired: self.n_sent=%d, self.self.n_modified=%d " % (self.n_sent, self.n_modified)
self.sender_conn.close()
self.receiver_conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.sender_conn = event.container.connect(self.addr)
self.sender = event.container.create_sender(self.sender_conn, self.dest)
self.receiver_conn = event.container.connect(self.addr)
self.receiver = event.container.create_receiver(self.receiver_conn, self.dest)
def on_released(self, event):
if event.delivery.remote_state == Delivery.MODIFIED:
self.n_modified += 1
self.check_if_done()
def on_message(self, event):
# The messages have arrived at the receiver but we will not settle the message and instead just closed the
# connection. Since the router did not receive the acknowledgements, it will send back MODIFIED dispositions
# to the sender.
self.n_received += 1
# After 10 messages are received, simply close the receiver connection without acknowledging the messages
if self.n_received == self.num_messages:
self.receiver_conn.close()
def on_sendable(self, event):
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
self.sender.send(msg)
self.n_sent += 1
def run(self):
Container(self).run()