blob: 370dfac1dc139ae3bf9d4aadd50bbce05463a0f8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, unittest, TestTimeout
class HeartbeatTimer:
def __init__(self, parent):
self.parent = parent
def on_timer_task(self, event):
self.parent.heartbeat()
class RouterTest(TestCase):
inter_router_port = None
@classmethod
def setUpClass(cls):
"""Start a router"""
super(RouterTest, cls).setUpClass()
def router(name, mode, extra=None):
config = [
('router', {'mode': mode, 'id': name, "helloMaxAgeSeconds": '10'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
('address', {'prefix': 'queue', 'waypoint': 'yes'}),
('address', {'prefix': 'multi', 'ingressPhase': '0', 'egressPhase': '9'})
]
if extra:
config.append(extra)
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
cls.routers = []
router('INT.A', 'interior')
def test_01_verify_rx_reject(self):
test = ReceiverRejectTest(self.routers[0].addresses[0])
test.run()
self.assertIsNone(test.error)
def test_02_close_with_no_heartbeats(self):
test = ConnectionCloseTest(self.routers[0].addresses[0], 0)
test.run()
self.assertIsNone(test.error)
def test_03_close_with_two_heartbeats(self):
test = ConnectionCloseTest(self.routers[0].addresses[0], 2)
test.run()
self.assertIsNone(test.error)
class ReceiverRejectTest(MessagingHandler):
def __init__(self, host):
super(ReceiverRejectTest, self).__init__()
self.host = host
self.conn = None
self.receiver = None
self.addr = '_$qd.edge_heartbeat'
self.error = None
def timeout(self):
self.error = "Timeout Expired"
self.conn.close()
def fail(self, error):
self.error = error
self.conn.close()
self.timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.host)
self.receiver = event.container.create_receiver(self.conn, self.addr)
def on_link_closing(self, event):
if event.receiver == self.receiver:
self.fail(None)
def on_link_opened(self, event):
if event.receiver == self.receiver:
if event.receiver.remote_source.address is not None:
self.fail('Heartbeat receiver was unexpectedly opened')
def run(self):
Container(self).run()
class ConnectionCloseTest(MessagingHandler):
def __init__(self, host, count):
super(ConnectionCloseTest, self).__init__()
self.host = host
self.count = count
self.conn = None
self.sender = None
self.addr = '_$qd.edge_heartbeat'
self.error = None
self.pending = False
self.n_tx = 0
def timeout(self):
self.error = "Timeout Expired - n_tx=%d" % self.n_tx
self.conn.close()
if self.hb_timer:
self.hb_timer.cancel()
def fail(self, error):
self.error = error
self.conn.close()
self.timer.cancel()
if self.hb_timer:
self.hb_timer.cancel()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.reactor = event.reactor
self.hb_timer = None
self.conn = event.container.connect(self.host)
self.sender = event.container.create_sender(self.conn, self.addr)
if self.count > 0:
self.heartbeat()
def send_heartbeat(self):
if self.sender.credit > 0 and self.pending:
msg = Message(body=self.n_tx)
self.sender.send(msg)
self.n_tx += 1
self.pending = False
self.hb_timer = self.reactor.schedule(1.0, HeartbeatTimer(self))
def heartbeat(self):
self.hb_timer = None
if self.n_tx < self.count:
self.pending = True
self.send_heartbeat()
def on_sendable(self, event):
if event.sender == self.sender:
self.send_heartbeat()
def on_disconnected(self, event):
self.fail(None)
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())