blob: faaa889e6d62626ecdf5f765927d3d2ba582af34 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from system_test import TestCase, Qdrouterd, main_module
from system_test import unittest, TIMEOUT, TestTimeout
from proton.handlers import MessagingHandler
from proton.reactor import Container
class MaxFrameMaxSessionFramesTest(TestCase):
"""System tests setting proton negotiated size max-frame-size and incoming-window"""
@classmethod
def setUpClass(cls):
super(MaxFrameMaxSessionFramesTest, cls).setUpClass()
name = "MaxFrameMaxSessionFrames"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'maxSessionFrames': '10'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
def test_max_frame_max_session_frames_max_sessions_default(self):
sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
sniffer.run()
self.assertEqual(2048, sniffer.remote_max_frame)
self.assertEqual(32767, sniffer.remote_channel_max)
class MaxSessionsTest(TestCase):
"""System tests setting proton channel-max"""
@classmethod
def setUpClass(cls):
super(MaxSessionsTest, cls).setUpClass()
name = "MaxSessions"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxSessions': '10'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
def test_max_sessions(self):
sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
sniffer.run()
self.assertEqual(9, sniffer.remote_channel_max)
class MaxSessionsZeroTest(TestCase):
"""System tests setting proton channel-max"""
@classmethod
def setUpClass(cls):
super(MaxSessionsZeroTest, cls).setUpClass()
name = "MaxSessionsZero"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxSessions': '0'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
def test_max_sessions_zero(self):
sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
sniffer.run()
self.assertEqual(32767, sniffer.remote_channel_max)
class MaxSessionsLargeTest(TestCase):
"""System tests setting proton channel-max"""
@classmethod
def setUpClass(cls):
super(MaxSessionsLargeTest, cls).setUpClass()
name = "MaxSessionsLarge"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxSessions': '500000'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
def test_max_sessions_large(self):
sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
sniffer.run()
self.assertEqual(32767, sniffer.remote_channel_max)
class MaxFrameSmallTest(TestCase):
"""System tests setting proton max-frame-size"""
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(MaxFrameSmallTest, cls).setUpClass()
name = "MaxFrameSmall"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxFrameSize': '2'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
def test_max_frame_small(self):
sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
sniffer.run()
self.assertEqual(512, sniffer.remote_max_frame)
class MaxFrameDefaultTest(TestCase):
"""System tests setting proton max-frame-size"""
@classmethod
def setUpClass(cls):
super(MaxFrameDefaultTest, cls).setUpClass()
name = "MaxFrameDefault"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port()}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
def test_max_frame_default(self):
sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
sniffer.run()
self.assertEqual(16384, sniffer.remote_max_frame)
class ConnectorSettingsDefaultTest(TestCase):
"""
The internal logic for protocol settings in listener and connector
is common code. This test makes sure that defaults in the connector
config make it to the wire.
"""
@classmethod
def setUpClass(cls):
super(ConnectorSettingsDefaultTest, cls).setUpClass()
def router(name, client_server, connection):
config = [
('router', {'mode': 'interior', 'id': 'QDR.%s' % name}),
('listener', {'port': cls.tester.get_port()}),
connection
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
cls.routers = []
cls.connector_port = cls.tester.get_port()
router('A', 'server',
('connector', {'name': 'testconnector', 'role': 'route-container', 'port': cls.connector_port}))
def test_connector_default(self):
sniffer = ConnectorSettingsSniffer(self.routers[0].connector_addresses[0])
sniffer.run()
self.assertEqual(16384, sniffer.remote_max_frame)
self.assertEqual(32767, sniffer.remote_channel_max)
class ConnectorSettingsNondefaultTest(TestCase):
"""
The internal logic for protocol settings in listener and connector
is common code. This test makes sure that settings in the connector
config make it to the wire.
"""
@classmethod
def setUpClass(cls):
super(ConnectorSettingsNondefaultTest, cls).setUpClass()
def router(name, client_server, connection):
config = [
('router', {'mode': 'interior', 'id': 'QDR.%s' % name}),
('listener', {'port': cls.tester.get_port()}),
connection
]
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
cls.routers = []
cls.connector_port = cls.tester.get_port()
router('A', 'server',
('connector', {'name': 'testconnector', 'role': 'route-container', 'port': cls.connector_port,
'maxFrameSize': '2048', 'maxSessionFrames': '10', 'maxSessions': '20'}))
def test_connector_default(self):
sniffer = ConnectorSettingsSniffer(self.routers[0].connector_addresses[0])
sniffer.run()
self.assertEqual(2048, sniffer.remote_max_frame)
self.assertEqual(19, sniffer.remote_channel_max)
class ProtocolSettingsSniffer(MessagingHandler):
"""Create a Connection/Session/Link and capture the various protocol
settings sent by the peer.
"""
def __init__(self, router_addr, source_addr="xxx", **kwargs):
super(ProtocolSettingsSniffer, self).__init__(**kwargs)
self.router_addr = router_addr
self.source_addr = source_addr
self.conn = None
self.remote_max_frame = None
self.remote_channel_max = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.router_addr)
self.receiver = event.container.create_receiver(self.conn, self.source_addr)
def timeout(self):
self.error = "Timeout Expired - could not connect to router"
self.conn.close()
def on_link_opened(self, event):
tport = event.transport
self.remote_max_frame = tport.remote_max_frame_size
self.remote_channel_max = tport.remote_channel_max
self.conn.close()
self.timer.cancel()
def run(self):
Container(self).run()
class ConnectorSettingsSniffer(MessagingHandler):
"""Similar to ProtocolSettingsSniffer, but for router-initiated connections
"""
def __init__(self, url, **kwargs):
super(ConnectorSettingsSniffer, self).__init__(**kwargs)
self.listener_addr = url
self.acceptor = None
self.remote_max_frame = None
self.remote_channel_max = None
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.acceptor = event.container.listen(self.listener_addr)
def timeout(self):
self.error = "Timeout Expired - router not connecting"
if self.acceptor:
self.acceptor.close()
def on_connection_opened(self, event):
tport = event.transport
self.remote_max_frame = tport.remote_max_frame_size
self.remote_channel_max = tport.remote_channel_max
event.connection.close()
self.acceptor.close()
self.timer.cancel()
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())