blob: 603764a41306011cb03e45cdef447ed5e80a31b6 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from proton import Message
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, unittest, TestTimeout
from proton.handlers import MessagingHandler
from proton.reactor import Container
class RouterTest(TestCase):
inter_router_port = None
@classmethod
def setUpClass(cls):
"""Start a router"""
super(RouterTest, cls).setUpClass()
def router(name, connection, args=[]):
config = [
('router', {'mode': 'interior', 'id': name}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'spread', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
connection
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=args))
cls.routers = []
inter_router_port = cls.tester.get_port()
router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}), ["-T"])
def test_01_denied_link(self):
test = DenyLinkTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/deny")
test.run()
self.assertIsNone(test.error)
def test_02_discard_deliveries(self):
test = DiscardTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/discard")
test.run()
self.assertIsNone(test.error)
def test_03_presettled_source(self):
test = SourceTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/source_ps", 300, 300)
test.run()
self.assertIsNone(test.error)
def test_04_unsettled_source(self):
test = SourceTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/source", 300, 0)
test.run()
self.assertIsNone(test.error)
def test_05_echo_attach_detach(self):
test = EchoTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/echo")
test.run()
self.assertIsNone(test.error)
class DenyLinkTest(MessagingHandler):
def __init__(self, host, address):
super(DenyLinkTest, self).__init__(prefetch=0)
self.host = host
self.address = address
self.conn = None
self.error = None
self.receiver = None
self.sender = None
self.receiver_failed = False
self.sender_failed = False
def timeout(self):
self.error = "Timeout Expired: receiver_failed=%s sender_failed=%s" %\
("yes" if self.receiver_failed else "no",
"yes" if self.sender_failed else "no")
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.host)
self.receiver = event.container.create_receiver(self.conn, self.address)
self.sender = event.container.create_sender(self.conn, self.address)
def on_link_error(self, event):
if event.receiver == self.receiver:
self.receiver_failed = True
if event.sender == self.sender:
self.sender_failed = True
if self.receiver_failed and self.sender_failed:
self.conn.close()
self.timer.cancel()
def run(self):
Container(self).run()
class DiscardTest(MessagingHandler):
def __init__(self, host, address):
super(DiscardTest, self).__init__(prefetch=0)
self.host = host
self.address = address
self.conn = None
self.error = None
self.sender = None
self.count = 300
self.sent = 0
self.rejected = 0
def timeout(self):
self.error = "Timeout Expired: n_sent=%d n_rejected=%d" % (self.sent, self.rejected)
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.host)
self.sender = event.container.create_sender(self.conn, self.address)
def on_sendable(self, event):
while self.sender.credit > 0 and self.sent < self.count:
msg = Message(body="Discard Test")
self.sender.send(msg)
self.sent += 1
def on_rejected(self, event):
self.rejected += 1
self.conn.close()
self.timer.cancel()
def on_link_error(self, event):
if event.receiver == self.receiver:
self.receiver_failed = True
if event.sender == self.sender:
self.sender_failed = True
if self.receiver_failed and self.sender_failed:
self.conn.close()
self.timer.cancel()
def run(self):
Container(self).run()
class SourceTest(MessagingHandler):
def __init__(self, host, address, count, expected_ps):
super(SourceTest, self).__init__(prefetch=0)
self.host = host
self.address = address
self.expected_ps = expected_ps
self.conn = None
self.error = None
self.receiver = None
self.count = count
self.n_credit_given = 0
self.n_rcvd = 0
self.n_rcvd_ps = 0
def timeout(self):
self.error = "Timeout Expired: n_rcvd=%d" % (self.n_rcvd)
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.host)
self.receiver = event.container.create_receiver(self.conn, self.address)
self.receiver.flow(3)
self.n_credit_given = 3
def on_message(self, event):
dlv = event.delivery
if dlv.settled:
self.n_rcvd_ps += 1
self.n_rcvd += 1
if self.n_rcvd == self.count:
self.conn.close()
self.timer.cancel()
if self.n_rcvd_ps != self.expected_ps:
self.error = "Received %d deliveries, %d were settled (expected %d)" %\
(self.n_rcvd, self.n_rcvd_ps, self.expected_ps)
elif self.n_rcvd == self.n_credit_given:
self.receiver.flow(5)
self.n_credit_given += 5
def run(self):
Container(self).run()
class EchoTest(MessagingHandler):
def __init__(self, host, address):
super(EchoTest, self).__init__(prefetch=0)
self.host = host
self.address = address
self.conn = None
self.error = None
self.action = "Connecting to router"
self.receiver = None
self.sender = None
def timeout(self):
self.error = "Timeout Expired while attempting action: %s" % self.action
self.conn.close()
def fail(self, error):
self.error = error
self.conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.host)
self.receiver = event.container.create_receiver(self.conn, self.address)
def on_link_opening(self, event):
if event.sender:
self.action = "Attaching incoming echoed link"
self.sender = event.sender
if event.sender.remote_source.address == self.address:
event.sender.source.address = self.address
event.sender.open()
else:
self.fail("Incorrect address on incoming sender: got %s, expected %s" %
(event.sender.remote_source.address, self.address))
def on_link_opened(self, event):
if event.receiver == self.receiver:
self.action = "Closing the echoed link"
self.receiver.close()
def on_link_closed(self, event):
if event.receiver == self.receiver:
self.conn.close()
self.timer.cancel()
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())