blob: f2eab417fbdec19ae20381f2f0726ff665c8446c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from system_test import TestCase, Qdrouterd, main_module, TestTimeout, unittest, TIMEOUT
from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler
from system_tests_drain_support import DrainMessagesMoreHandler
from proton.handlers import MessagingHandler
from proton import Message
from proton.reactor import Container
from time import sleep
class DrainSupportTest(TestCase):
@classmethod
def setUpClass(cls):
"""
Set up two routers:
Router 'test-router' is the system under test.
Router 'broker' acts as a link route sink/source.
The link route uses prefix 'abc'.
"""
super(DrainSupportTest, cls).setUpClass()
test_listener_port = cls.tester.get_port()
broker_listener_port = cls.tester.get_port()
# Configure and start 'broker'
bname = "broker"
bconfig = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'Broker'}),
('listener', {'role': 'normal',
'host': '0.0.0.0', 'port': broker_listener_port, 'linkCapacity': 100, 'saslMechanisms': 'ANONYMOUS'}),
])
cls.broker = cls.tester.qdrouterd(bname, bconfig, wait=True)
# Configure and start test-router
name = "test-router"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
# Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages
('listener', {'role': 'normal',
'host': '0.0.0.0', 'port': test_listener_port,
'linkCapacity': 10, 'saslMechanisms': 'ANONYMOUS'}),
# The DrainMessagesMoreHandler accepts a src/tgt address that may be link-routed.
# This defines the link route to 'broker' and the 'abc' prefix.
('connector', {'name': 'broker1-conn', 'role': 'route-container',
'host': '0.0.0.0', 'port': broker_listener_port,
'saslMechanisms': 'ANONYMOUS'}),
('linkRoute', {'prefix': 'abc', 'direction': 'out', 'connection': 'broker1-conn'}),
('linkRoute', {'prefix': 'abc', 'direction': 'in', 'connection': 'broker1-conn'}),
])
cls.router = cls.tester.qdrouterd(name, config, wait=False)
cls.address = cls.router.addresses[0]
sleep(4) # starting router with wait=True hangs. sleep for now
def test_drain_support_1_all_messages(self):
drain_support = DrainMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
def test_drain_support_2_one_message(self):
drain_support = DrainOneMessageHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
def test_drain_support_3_no_messages(self):
drain_support = DrainNoMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
def test_drain_support_4_no_more_messages(self):
drain_support = DrainNoMoreMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
def test_drain_support_5_drain_then_more_messages_local(self):
drain_support = DrainMessagesMoreHandler(self.address, "org.apache.dev")
drain_support.run()
self.assertEqual(drain_support.error, None)
def test_drain_support_5_drain_then_more_messages_routed(self):
drain_support = DrainMessagesMoreHandler(self.address, "abc")
drain_support.run()
self.assertEqual(drain_support.error, None)
class ReceiverDropsOffDrainTest(TestCase):
@classmethod
def setUpClass(cls):
super(ReceiverDropsOffDrainTest, cls).setUpClass()
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'Broker'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port(),
'saslMechanisms': 'ANONYMOUS'}),
])
cls.router = cls.tester.qdrouterd("A", config, wait=True)
cls.address = cls.router.addresses[0]
def test_receiver_drops_off_sender_receives_drain(self):
test = ReceiverDropsOffSenderDrain(self.address, "examples")
test.run()
self.assertIsNone(test.error)
class ReceiverDropsOffSenderDrain(MessagingHandler):
def __init__(self, address, dest):
super(ReceiverDropsOffSenderDrain, self).__init__()
self.sender_conn = None
self.receiver_conn = None
self.sender = None
self.receiver = None
self.error = None
self.sender_drained = False
self.address = address
self.dest = dest
self.num_msgs = 0
self.receiver_closed = False
self.drained = 0
self.expected_drained = 249
self.sender_drained = False
self.timer = None
# Second receiver link opened.
self.sec_recv_link_opened = False
def timeout(self):
if not self.error:
self.error = "Timeout Expired: Sender was not drained. Expected " \
"drained=%s, actual drain=%s " % \
(self.expected_drained, self.drained)
self.sender_conn.close()
self.receiver_conn.close()
def on_start(self, event):
# Create sender and receiver in two separate connections
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.sender_conn = event.container.connect(self.address)
self.receiver_conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.sender_conn,
self.dest)
self.receiver = event.container.create_receiver(self.receiver_conn,
self.dest)
def on_sendable(self, event):
# Send just one message for now
if self.num_msgs < 1:
self.num_msgs += 1
msg = Message(body={'number': 1})
self.sender.send(msg)
def on_message(self, event):
# As soon as the receiver receives the message, close the receiver
if event.receiver == self.receiver:
if self.sec_recv_link_opened and self.sender_drained:
# Make sure this is the same message body that was
# sent by the newly created receiver
if event.message.body[u'number'] == 3:
self.receiver.close()
self.error = None
self.sender_conn.close()
self.timer.cancel()
self.receiver_conn.close()
else:
self.receiver.close()
self.receiver_closed = True
def on_link_opened(self, event):
if self.sender_drained:
if event.receiver == self.receiver:
self.sec_recv_link_opened = True
if self.num_msgs < 3:
# Send a message after the sender has been drained
# and a new receiver has been created
# and make sure that the message has reached the receiver
self.num_msgs += 1
msg = Message(body={'number': 3})
self.sender.send(msg)
def on_link_closed(self, event):
if event.receiver == self.receiver:
# The receiver link is closed. The sender still have a credit
# of 249. Now send a message. The receiver is already gone at
# this point. The router will receive this message and see that
# there are no receivers and it will send a drain to the sender
# This test will not work without the fix for DISPATCH-1090
self.num_msgs += 1
msg = Message(body={'number': 2})
self.sender.send(msg)
def on_link_flow(self, event):
if self.receiver_closed:
if event.sender:
self.drained = event.sender.drained()
if self.drained == self.expected_drained:
# The sender has been drained. Now create another receiver
# to the same address as the sender
# and use the sender to send a message to see
# if flow is re-issued by the router to the sender and if
# the message reaches this newly created receiver
self.sender_drained = True
# Create a new receiver
self.receiver = event.container.create_receiver(
self.receiver_conn,
self.dest,
name="A")
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())