|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements.  See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership.  The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied.  See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  | # | 
|  |  | 
|  | from __future__ import unicode_literals | 
|  | from __future__ import division | 
|  | from __future__ import absolute_import | 
|  | from __future__ import print_function | 
|  |  | 
|  | from system_test import TestCase, Qdrouterd, main_module, TestTimeout, unittest, TIMEOUT | 
|  | from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler | 
|  | from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler | 
|  | from system_tests_drain_support import DrainMessagesMoreHandler | 
|  | from proton.handlers import MessagingHandler | 
|  | from proton import Message | 
|  | from proton.reactor import Container | 
|  | from time import sleep | 
|  |  | 
|  |  | 
|  | class DrainSupportTest(TestCase): | 
|  |  | 
|  | @classmethod | 
|  | def setUpClass(cls): | 
|  | """ | 
|  | Set up two routers: | 
|  | Router 'test-router' is the system under test. | 
|  | Router 'broker' acts as a link route sink/source. | 
|  | The link route uses prefix 'abc'. | 
|  | """ | 
|  | super(DrainSupportTest, cls).setUpClass() | 
|  |  | 
|  | test_listener_port   = cls.tester.get_port() | 
|  | broker_listener_port = cls.tester.get_port() | 
|  |  | 
|  | # Configure and start 'broker' | 
|  | bname = "broker" | 
|  | bconfig = Qdrouterd.Config([ | 
|  | ('router', {'mode': 'standalone', 'id': 'Broker'}), | 
|  | ('listener', {'role': 'normal', | 
|  | 'host': '0.0.0.0', 'port': broker_listener_port, 'linkCapacity': 100, 'saslMechanisms': 'ANONYMOUS'}), | 
|  | ]) | 
|  | cls.broker = cls.tester.qdrouterd(bname, bconfig, wait=True) | 
|  |  | 
|  | # Configure and start test-router | 
|  | name = "test-router" | 
|  | config = Qdrouterd.Config([ | 
|  | ('router', {'mode': 'standalone', 'id': 'QDR'}), | 
|  |  | 
|  | # Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages | 
|  | ('listener', {'role': 'normal', | 
|  | 'host': '0.0.0.0', 'port': test_listener_port, | 
|  | 'linkCapacity': 10, 'saslMechanisms': 'ANONYMOUS'}), | 
|  |  | 
|  | # The DrainMessagesMoreHandler accepts a src/tgt address that may be link-routed. | 
|  | # This defines the link route to 'broker' and the 'abc' prefix. | 
|  | ('connector', {'name': 'broker1-conn', 'role': 'route-container', | 
|  | 'host': '0.0.0.0', 'port': broker_listener_port, | 
|  | 'saslMechanisms': 'ANONYMOUS'}), | 
|  | ('linkRoute', {'prefix': 'abc', 'direction': 'out', 'connection': 'broker1-conn'}), | 
|  | ('linkRoute', {'prefix': 'abc', 'direction': 'in', 'connection': 'broker1-conn'}), | 
|  | ]) | 
|  |  | 
|  | cls.router = cls.tester.qdrouterd(name, config, wait=False) | 
|  | cls.address = cls.router.addresses[0] | 
|  |  | 
|  | sleep(4)  # starting router with wait=True hangs. sleep for now | 
|  |  | 
|  | def test_drain_support_1_all_messages(self): | 
|  | drain_support = DrainMessagesHandler(self.address) | 
|  | drain_support.run() | 
|  | self.assertEqual(drain_support.error, None) | 
|  |  | 
|  | def test_drain_support_2_one_message(self): | 
|  | drain_support = DrainOneMessageHandler(self.address) | 
|  | drain_support.run() | 
|  | self.assertEqual(drain_support.error, None) | 
|  |  | 
|  | def test_drain_support_3_no_messages(self): | 
|  | drain_support = DrainNoMessagesHandler(self.address) | 
|  | drain_support.run() | 
|  | self.assertEqual(drain_support.error, None) | 
|  |  | 
|  | def test_drain_support_4_no_more_messages(self): | 
|  | drain_support = DrainNoMoreMessagesHandler(self.address) | 
|  | drain_support.run() | 
|  | self.assertEqual(drain_support.error, None) | 
|  |  | 
|  | def test_drain_support_5_drain_then_more_messages_local(self): | 
|  | drain_support = DrainMessagesMoreHandler(self.address, "org.apache.dev") | 
|  | drain_support.run() | 
|  | self.assertEqual(drain_support.error, None) | 
|  |  | 
|  | def test_drain_support_5_drain_then_more_messages_routed(self): | 
|  | drain_support = DrainMessagesMoreHandler(self.address, "abc") | 
|  | drain_support.run() | 
|  | self.assertEqual(drain_support.error, None) | 
|  |  | 
|  |  | 
|  | class ReceiverDropsOffDrainTest(TestCase): | 
|  |  | 
|  | @classmethod | 
|  | def setUpClass(cls): | 
|  | super(ReceiverDropsOffDrainTest, cls).setUpClass() | 
|  | config = Qdrouterd.Config([ | 
|  | ('router', {'mode': 'standalone', 'id': 'Broker'}), | 
|  | ('listener', {'role': 'normal', | 
|  | 'port': cls.tester.get_port(), | 
|  | 'saslMechanisms': 'ANONYMOUS'}), | 
|  | ]) | 
|  |  | 
|  | cls.router = cls.tester.qdrouterd("A", config, wait=True) | 
|  | cls.address = cls.router.addresses[0] | 
|  |  | 
|  | def test_receiver_drops_off_sender_receives_drain(self): | 
|  | test = ReceiverDropsOffSenderDrain(self.address, "examples") | 
|  | test.run() | 
|  | self.assertIsNone(test.error) | 
|  |  | 
|  |  | 
|  | class ReceiverDropsOffSenderDrain(MessagingHandler): | 
|  | def __init__(self, address, dest): | 
|  | super(ReceiverDropsOffSenderDrain, self).__init__() | 
|  | self.sender_conn = None | 
|  | self.receiver_conn = None | 
|  | self.sender = None | 
|  | self.receiver = None | 
|  | self.error = None | 
|  | self.sender_drained = False | 
|  | self.address = address | 
|  | self.dest = dest | 
|  | self.num_msgs = 0 | 
|  | self.receiver_closed = False | 
|  | self.drained = 0 | 
|  | self.expected_drained = 249 | 
|  | self.sender_drained = False | 
|  | self.timer = None | 
|  |  | 
|  | # Second receiver link opened. | 
|  | self.sec_recv_link_opened = False | 
|  |  | 
|  | def timeout(self): | 
|  | if not self.error: | 
|  | self.error = "Timeout Expired: Sender was not drained. Expected " \ | 
|  | "drained=%s, actual drain=%s " % \ | 
|  | (self.expected_drained, self.drained) | 
|  | self.sender_conn.close() | 
|  | self.receiver_conn.close() | 
|  |  | 
|  | def on_start(self, event): | 
|  | # Create sender and receiver in two separate connections | 
|  | self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) | 
|  | self.sender_conn = event.container.connect(self.address) | 
|  | self.receiver_conn = event.container.connect(self.address) | 
|  | self.sender = event.container.create_sender(self.sender_conn, | 
|  | self.dest) | 
|  | self.receiver = event.container.create_receiver(self.receiver_conn, | 
|  | self.dest) | 
|  |  | 
|  | def on_sendable(self, event): | 
|  | # Send just one message for now | 
|  | if self.num_msgs < 1: | 
|  | self.num_msgs += 1 | 
|  | msg = Message(body={'number': 1}) | 
|  | self.sender.send(msg) | 
|  |  | 
|  | def on_message(self, event): | 
|  | # As soon as the receiver receives the message, close the receiver | 
|  | if event.receiver == self.receiver: | 
|  | if self.sec_recv_link_opened and self.sender_drained: | 
|  |  | 
|  | # Make sure this is the same message body that was | 
|  | # sent by the newly created receiver | 
|  | if event.message.body[u'number'] == 3: | 
|  | self.receiver.close() | 
|  | self.error = None | 
|  | self.sender_conn.close() | 
|  | self.timer.cancel() | 
|  | self.receiver_conn.close() | 
|  | else: | 
|  | self.receiver.close() | 
|  | self.receiver_closed = True | 
|  |  | 
|  | def on_link_opened(self, event): | 
|  | if self.sender_drained: | 
|  | if event.receiver == self.receiver: | 
|  | self.sec_recv_link_opened = True | 
|  |  | 
|  | if self.num_msgs < 3: | 
|  | # Send a message after the sender has been drained | 
|  | # and a new receiver has been created | 
|  | # and make sure that the message has reached the receiver | 
|  | self.num_msgs += 1 | 
|  | msg = Message(body={'number': 3}) | 
|  | self.sender.send(msg) | 
|  |  | 
|  | def on_link_closed(self, event): | 
|  | if event.receiver == self.receiver: | 
|  | # The receiver link is closed. The sender still have a credit | 
|  | # of 249. Now send a message. The receiver is already gone at | 
|  | # this point. The router will receive this message and see that | 
|  | # there are no receivers and it will send a drain to the sender | 
|  | # This test will not work without the fix for DISPATCH-1090 | 
|  | self.num_msgs += 1 | 
|  | msg = Message(body={'number': 2}) | 
|  | self.sender.send(msg) | 
|  |  | 
|  | def on_link_flow(self, event): | 
|  | if self.receiver_closed: | 
|  | if event.sender: | 
|  | self.drained = event.sender.drained() | 
|  | if self.drained == self.expected_drained: | 
|  | # The sender has been drained. Now create another receiver | 
|  | # to the same address as the sender | 
|  | # and use the sender to send a message to see | 
|  | # if flow is re-issued by the router to the sender and if | 
|  | # the message reaches this newly created receiver | 
|  | self.sender_drained = True | 
|  |  | 
|  | # Create a new receiver | 
|  | self.receiver = event.container.create_receiver( | 
|  | self.receiver_conn, | 
|  | self.dest, | 
|  | name="A") | 
|  |  | 
|  | def run(self): | 
|  | Container(self).run() | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | unittest.main(main_module()) |