#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function


import json
from threading import Timer
from proton import Message
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
from system_test import unittest
from proton.handlers import MessagingHandler
from proton.reactor import Container
from subprocess import PIPE, STDOUT
from qpid_dispatch.management.client import Node

CONNECTION_PROPERTIES = {u'connection': u'properties', u'int_property': 6451}


class AutoLinkDetachAfterAttachTest(MessagingHandler):
    def __init__(self, address, node_addr):
        super(AutoLinkDetachAfterAttachTest, self).__init__(prefetch=0)
        self.timer = None
        self.error = None
        self.conn = None
        self.address = address
        self.n_rx_attach = 0
        self.n_tx_attach = 0
        self.node_addr = node_addr
        self.sender = None
        self.receiver = None

    def timeout(self):
        self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach)
        self.conn.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)

    def on_link_opened(self, event):
        if event.sender:
            self.sender = event.sender
            self.n_tx_attach += 1
            if event.sender.remote_source.address != self.node_addr:
                self.error = "Expected sender address '%s', got '%s'" % (self.node_addr, event.sender.remote_source.address)
                self.timer.cancel()
                self.conn.close()
        elif event.receiver:
            self.receiver = event.receiver
            self.n_rx_attach += 1
            if event.receiver.remote_target.address != self.node_addr:
                self.error = "Expected receiver address '%s', got '%s'" % (self.node_addr, event.receiver.remote_target.address)
                self.timer.cancel()
                self.conn.close()

        if self.n_tx_attach == 1 and self.n_rx_attach == 1:
            # we have received 2 attaches from the router on the
            # autolink address. Now close the sender and the receiver
            # The router will retry establishing the autolinks.
            self.sender.close()
            self.receiver.close()

        # The router will retry the auto link and the n_tx_attach and
        # n_rx_attach will be 2
        if self.n_tx_attach == 2 and self.n_rx_attach == 2:
            # This if statement will fail if you comment out the call to
            # qdr_route_auto_link_detached_CT(core, link) in
            # qdr_link_inbound_detach_CT() (connections.c)
            self.conn.close()
            self.timer.cancel()

    def run(self):
        Container(self).run()


class DetachAfterAttachTest(TestCase):
    @classmethod
    def setUpClass(cls):
        super(DetachAfterAttachTest, cls).setUpClass()
        name = "test-router"

        config = Qdrouterd.Config([

            ('router', {'mode': 'standalone', 'id': 'A'}),
            ('listener', {'host': '127.0.0.1', 'role': 'normal',
                          'port': cls.tester.get_port()}),

            ('listener', {'role': 'route-container', 'name': 'myListener',
                          'port': cls.tester.get_port()}),

            ('autoLink', {'address': 'myListener.1', 'connection': 'myListener',
                          'direction': 'in'}),
            ('autoLink', {'address': 'myListener.1', 'connection': 'myListener',
                          'direction': 'out'}),
        ])

        cls.router = cls.tester.qdrouterd(name, config)
        cls.router.wait_ready()
        cls.route_address = cls.router.addresses[1]

    def test_auto_link_attach_detach_reattch(self):
        test = AutoLinkDetachAfterAttachTest(self.route_address, 'myListener.1')
        test.run()
        self.assertEqual(None, test.error)


class AutoLinkRetryTest(TestCase):
    inter_router_port = None

    @classmethod
    def router(cls, name, config):
        config = Qdrouterd.Config(config)
        cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))

    @classmethod
    def setUpClass(cls):
        super(AutoLinkRetryTest, cls).setUpClass()
        cls.routers = []

        cls.inter_router_port = cls.tester.get_port()

        cls.router('B',
                    [
                        ('router', {'mode': 'standalone', 'id': 'B'}),
                        ('listener', {'role': 'normal',
                                      'port': cls.tester.get_port()}),
                        ('listener', {'host': '127.0.0.1',
                                      'role': 'normal',
                                      'port': cls.inter_router_port}),
                        # Note here that the distribution of the address
                        # 'examples' is set to 'unavailable'
                        # This will ensure that any attach coming in for
                        # this address will be rejected.
                        ('address',
                         {'prefix': 'examples',
                          'name': 'unavailable-address',
                          'distribution': 'unavailable'}),
                    ])

        cls.router('A', [
                        ('router', {'mode': 'standalone', 'id': 'A'}),
                        ('listener', {'host': '127.0.0.1', 'role': 'normal',
                                      'port': cls.tester.get_port()}),

            ('connector', {'host': '127.0.0.1', 'name': 'connectorToB',
                           'role': 'route-container',
                           'port': cls.inter_router_port}),

                        ('autoLink', {'connection': 'connectorToB',
                                      'address': 'examples', 'direction': 'in'}),
                        ('autoLink', {'connection': 'connectorToB',
                                      'address': 'examples', 'direction': 'out'}),
        ])

    def __init__(self, test_method):
        TestCase.__init__(self, test_method)
        self.success = False
        self.timer_delay = 6
        self.max_attempts = 2
        self.attempts = 0

    def address(self):
        return self.routers[1].addresses[0]

    def check_auto_link(self):
        long_type = 'org.apache.qpid.dispatch.router.config.autoLink'
        query_command = 'QUERY --type=' + long_type
        output = json.loads(self.run_qdmanage(query_command))

        if output[0].get('operStatus') == "active":
            self.success = True
        else:
            self.schedule_auto_link_reconnect_test()

        self.attempts += 1

    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
        p = self.popen(
            ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
            universal_newlines=True)
        out = p.communicate(input)[0]
        try:
            p.teardown()
        except Exception as e:
            raise Exception("%s\n%s" % (e, out))
        return out

    def can_terminate(self):
        if self.attempts == self.max_attempts:
            return True

        if self.success:
            return True

        return False

    def schedule_auto_link_reconnect_test(self):
        if self.attempts < self.max_attempts:
            if not self.success:
                Timer(self.timer_delay, self.check_auto_link).start()

    def test_auto_link_reattch(self):
        long_type = 'org.apache.qpid.dispatch.router.config.autoLink'
        query_command = 'QUERY --type=' + long_type
        output = json.loads(self.run_qdmanage(query_command))

        # Since the distribution of the autoLinked address 'examples'
        # is set to unavailable, the link route will initially be in the
        # failed state
        self.assertEqual(output[0]['operStatus'], 'failed')
        self.assertEqual(output[0]['lastError'], 'Node not found')

        # Now, we delete the address 'examples' (it becomes available)
        # The Router A must now
        # re-attempt to establish the autoLink and once the  autoLink
        # is up, it should return to the 'active' state.
        delete_command = 'DELETE --type=address --name=unavailable-address'
        self.run_qdmanage(delete_command,  address=self.routers[0].addresses[0])

        self.schedule_auto_link_reconnect_test()

        while not self.can_terminate():
            pass

        self.assertTrue(self.success)


class WaypointReceiverPhaseTest(TestCase):
    inter_router_port = None

    @classmethod
    def router(cls, name, config):
        config = Qdrouterd.Config(config)

        cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))

    @classmethod
    def setUpClass(cls):
        super(WaypointReceiverPhaseTest, cls).setUpClass()

        cls.routers = []

        cls.inter_router_port = cls.tester.get_port()
        cls.inter_router_port_1 = cls.tester.get_port()
        cls.backup_port = cls.tester.get_port()
        cls.backup_url = 'amqp://0.0.0.0:' + str(cls.backup_port)

        WaypointReceiverPhaseTest.router('A', [
                        ('router', {'mode': 'interior', 'id': 'A'}),
                        ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port()}),
                        ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': cls.inter_router_port}),
                        ('autoLink', {'address': '0.0.0.0/queue.ext', 'direction': 'in', 'externalAddress': 'EXT'}),
                        ('autoLink', {'address': '0.0.0.0/queue.ext', 'direction': 'out', 'externalAddress': 'EXT'}),
                        ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),

            ])

        WaypointReceiverPhaseTest.router('B',
                    [
                        ('router', {'mode': 'interior', 'id': 'B'}),
                        ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port()}),
                        ('connector', {'name': 'connectorToB', 'role': 'inter-router',
                                       'port': cls.inter_router_port, 'verifyHostname': 'no'}),
                        ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
                    ])

        cls.routers[1].wait_router_connected('A')

    def test_two_router_waypoint_no_tenant_external_addr_phase(self):
        """
        Attaches two receiver each to one router with an autoLinked address and makes sure that the phase
        on both receivers is set to 1
        :return:
        """
        test = WaypointTest(self.routers[0].addresses[0], self.routers[1].addresses[0], "0.0.0.0/queue.ext")
        test.run()
        self.assertEqual(None, test.error)


class WaypointTest(MessagingHandler):
    def __init__(self, first_host, second_host, dest):
        super(WaypointTest, self).__init__()
        self.first_host = first_host
        self.second_host = second_host
        self.first_conn = None
        self.second_conn = None
        self.error = None
        self.timer = None
        self.receiver1 = None
        self.receiver2 = None
        self.dest = dest
        self.receiver1_phase = False
        self.receiver2_phase = False

    def timeout(self):
        self.error = "The phase on the receiver links were not set to 1"
        self.first_conn.close()
        self.second_conn.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.first_conn = event.container.connect(self.first_host)
        self.second_conn = event.container.connect(self.second_host)
        self.receiver1 = event.container.create_receiver(self.first_conn, self.dest, name="AAA")
        self.receiver2 = event.container.create_receiver(self.second_conn, self.dest, name="BBB")

    def on_link_opened(self, event):
        if event.receiver == self.receiver1:
            local_node = Node.connect(self.first_host, timeout=TIMEOUT)
            out = local_node.query(type='org.apache.qpid.dispatch.router.link')
            link_type_index = out.attribute_names.index('linkType')
            link_dir_index = out.attribute_names.index('linkDir')
            owning_addr_index = out.attribute_names.index('owningAddr')
            link_name_index = out.attribute_names.index('linkName')

            for result in out.results:
                if result[link_type_index] == "endpoint" and result[link_dir_index] == "out" and result[link_name_index] == 'AAA' and result[owning_addr_index] == 'M10.0.0.0/queue.ext':
                    self.receiver1_phase = True
        elif event.receiver == self.receiver2:
            local_node = Node.connect(self.second_host, timeout=TIMEOUT)
            out = local_node.query(type='org.apache.qpid.dispatch.router.link')
            link_type_index = out.attribute_names.index('linkType')
            link_dir_index = out.attribute_names.index('linkDir')
            owning_addr_index = out.attribute_names.index('owningAddr')
            link_name_index = out.attribute_names.index('linkName')

            for result in out.results:
                if result[link_type_index] == "endpoint" and result[link_dir_index] == "out" and result[link_name_index] == 'BBB' and result[owning_addr_index] == 'M10.0.0.0/queue.ext':
                    self.receiver2_phase = True

        if self.receiver1_phase and self.receiver2_phase:
            self.first_conn.close()
            self.second_conn.close()
            self.timer.cancel()

    def run(self):
        Container(self).run()


class AutolinkTest(TestCase):
    """System tests involving a single router"""
    @classmethod
    def setUpClass(cls):
        """Start a router and a messenger"""
        super(AutolinkTest, cls).setUpClass()
        name = "test-router"
        config = Qdrouterd.Config([
            ('router', {'mode': 'standalone', 'id': 'QDR'}),

            #
            # Create a general-purpose listener for sending and receiving deliveries
            #
            ('listener', {'port': cls.tester.get_port()}),

            #
            # Create a route-container listener for the autolinks
            #
            ('listener', {'port': cls.tester.get_port(), 'role': 'route-container'}),

            #
            # Create a route-container listener and give it a name myListener.
            # Later on we will create an autoLink which has a connection property of myListener.
            #
            ('listener', {'role': 'route-container', 'name': 'myListener', 'port': cls.tester.get_port()}),

            #
            # Set up the prefix 'node' as a prefix for waypoint addresses
            #
            ('address',  {'prefix': 'node', 'waypoint': 'yes'}),

            #
            # Create a pair of default auto-links for 'node.1'
            #
            ('autoLink', {'address': 'node.1', 'containerId': 'container.1', 'direction': 'in'}),
            ('autoLink', {'address': 'node.1', 'containerId': 'container.1', 'direction': 'out'}),

            #
            # Create a pair of auto-links on non-default phases for container-to-container transfers
            #
            ('autoLink', {'address': 'xfer.2', 'containerId': 'container.2', 'direction': 'in',  'phase': '4'}),
            ('autoLink', {'address': 'xfer.2', 'containerId': 'container.3', 'direction': 'out', 'phase': '4'}),

            #
            # Create a pair of auto-links with a different external address
            # Leave the direction as dir to test backward compatibility.
            #
            ('autoLink', {'address': 'node.2', 'externalAddress': 'ext.2', 'containerId': 'container.4', 'dir': 'in'}),
            ('autoLink', {'address': 'node.2', 'externalAddress': 'ext.2', 'containerId': 'container.4', 'dir': 'out'}),

            #
            # Note here that the connection is set to a previously declared 'myListener'
            #
            ('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'in'}),
            ('autoLink', {'address': 'myListener.1', 'connection': 'myListener', 'direction': 'out'}),

        ])

        cls.router = cls.tester.qdrouterd(name, config)
        cls.router.wait_ready()
        cls.normal_address = cls.router.addresses[0]
        cls.route_address = cls.router.addresses[1]
        cls.ls_route_address = cls.router.addresses[2]

    def run_qdstat_general(self):
        cmd = ['qdstat', '--bus', str(AutolinkTest.normal_address), '--timeout', str(TIMEOUT)] + ['-g']
        p = self.popen(
            cmd,
            name='qdstat-'+self.id(), stdout=PIPE, expect=None,
            universal_newlines=True)

        out = p.communicate()[0]
        assert p.returncode == 0, "qdstat exit status %s, output:\n%s" % (p.returncode, out)
        return out

    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK):
        p = self.popen(
            ['qdmanage'] + cmd.split(' ') + ['--bus', AutolinkTest.normal_address, '--indent=-1', '--timeout', str(TIMEOUT)],
            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
            universal_newlines=True)
        out = p.communicate(input)[0]
        try:
            p.teardown()
        except Exception as e:
            raise Exception("%s\n%s" % (e, out))
        return out

    def test_01_autolink_attach(self):
        """
        Create the route-container connection and verify that the appropriate links are attached.
        Disconnect, reconnect, and verify that the links are re-attached.
        """
        test = AutolinkAttachTest('container.1', self.route_address, 'node.1')
        test.run()
        self.assertEqual(None, test.error)

    def test_02_autolink_credit(self):
        """
        Create a normal connection and a sender to the autolink address.  Then create the route-container
        connection and ensure that the on_sendable did not arrive until after the autolinks were created.
        """
        test = AutolinkCreditTest(self.normal_address, self.route_address)
        test.run()
        self.assertEqual(None, test.error)
        self.assertTrue(test.autolink_count_ok)

    def test_03_autolink_sender(self):
        """
        Create a route-container connection and a normal sender.  Ensure that messages sent on the sender
        link are received by the route container and that settlement propagates back to the sender.
        """
        test = AutolinkSenderTest('container.1', self.normal_address, self.route_address, 'node.1', 'node.1')
        test.run()
        self.assertEqual(None, test.error)

        long_type = 'org.apache.qpid.dispatch.router'
        query_command = 'QUERY --type=' + long_type
        output = json.loads(self.run_qdmanage(query_command))
        self.assertEqual(output[0]['deliveriesEgressRouteContainer'], 275)
        self.assertEqual(output[0]['deliveriesIngressRouteContainer'], 0)
        self.assertEqual(output[0]['deliveriesTransit'], 0)

        self.assertEqual(output[0]['deliveriesIngress'], 277)
        self.assertEqual(output[0]['deliveriesEgress'], 276)

    def test_04_autolink_receiver(self):
        """
        Create a route-container connection and a normal receiver.  Ensure that messages sent from the
        route-container are received by the receiver and that settlement propagates back to the sender.
        """
        test = AutolinkReceiverTest('container.1', self.normal_address, self.route_address, 'node.1', 'node.1')
        test.run()
        self.assertEqual(None, test.error)

        long_type = 'org.apache.qpid.dispatch.router'
        query_command = 'QUERY --type=' + long_type
        output = json.loads(self.run_qdmanage(query_command))
        self.assertEqual(output[0]['deliveriesEgressRouteContainer'], 275)
        self.assertEqual(output[0]['deliveriesIngressRouteContainer'], 275)
        self.assertEqual(output[0]['deliveriesTransit'], 0)

        self.assertEqual(output[0]['deliveriesIngress'], 553)
        self.assertEqual(output[0]['deliveriesEgress'], 552)

    def test_05_inter_container_transfer(self):
        """
        Create a route-container connection and a normal receiver.  Ensure that messages sent from the
        route-container are received by the receiver and that settlement propagates back to the sender.
        """
        test = InterContainerTransferTest(self.normal_address, self.route_address)
        test.run()
        self.assertEqual(None, test.error)

    def test_06_manage_autolinks(self):
        """
        Create a route-container connection and a normal receiver.  Use the management API to create
        autolinks to the route container.  Verify that the links are created.  Delete the autolinks.
        Verify that the links are closed without error.
        """
        test = ManageAutolinksTest(self.normal_address, self.route_address)
        test.run()
        self.assertEqual(None, test.error)

    def test_07_autolink_attach_with_ext_addr(self):
        """
        Create the route-container connection and verify that the appropriate links are attached.
        Disconnect, reconnect, and verify that the links are re-attached.  Verify that the node addresses
        in the links are the configured external address.
        """
        test = AutolinkAttachTest('container.4', self.route_address, 'ext.2')
        test.run()
        self.assertEqual(None, test.error)

    def test_08_autolink_sender_with_ext_addr(self):
        """
        Create a route-container connection and a normal sender.  Ensure that messages sent on the sender
        link are received by the route container and that settlement propagates back to the sender.
        """
        test = AutolinkSenderTest('container.4', self.normal_address, self.route_address, 'node.2', 'ext.2')
        test.run()
        self.assertEqual(None, test.error)

    def test_09_autolink_receiver_with_ext_addr(self):
        """
        Create a route-container connection and a normal receiver.  Ensure that messages sent from the
        route-container are received by the receiver and that settlement propagates back to the sender.
        """
        test = AutolinkReceiverTest('container.4', self.normal_address, self.route_address, 'node.2', 'ext.2')
        test.run()
        self.assertEqual(None, test.error)

    def test_10_autolink_attach_to_listener(self):
        """
        Create two route-container receivers with the same connection name (myListener) and verify that the appropriate
        links over both connections are attached. Disconnect, reconnect, and verify that the links are re-attached.
        """
        test = AutolinkAttachTestWithListenerName(self.ls_route_address, 'myListener.1')
        test.run()
        self.assertEqual(None, test.error)

    def test_11_autolink_multiple_receivers_on_listener(self):
        """
        Create two receivers connecting into a route container listener. Create one sender to a normal listener
        Have the sender send two messages to the address on which the route container listeners are listening and
        make sure that each receiver gets one message.
        """
        test = AutolinkMultipleReceiverUsingMyListenerTest(self.normal_address, self.ls_route_address, 'myListener.1')
        test.run()
        self.assertEqual(None, test.error)


class Timeout(object):
    def __init__(self, parent):
        self.parent = parent

    def on_timer_task(self, event):
        self.parent.timeout()


class AutolinkAttachTestWithListenerName(MessagingHandler):
    def __init__(self, address, node_addr):
        super(AutolinkAttachTestWithListenerName, self).__init__(prefetch=0)
        self.address = address
        self.node_addr = node_addr
        self.error = None
        self.sender = None
        self.receiver = None
        self.timer = None
        self.n_rx_attach = 0
        self.n_tx_attach = 0
        self.conn = None
        self.conn1 = None
        self.conns_reopened = False

    def timeout(self):
        self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach)
        self.conn.close()
        self.conn1.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))

        # We create teo connections to the same listener here and we expect attaches to be sent on both connections
        self.conn = event.container.connect(self.address)
        self.conn1 = event.container.connect(self.address)

    def on_connection_closed(self, event):
        if self.n_tx_attach == 2 and not self.conns_reopened:
            self.conns_reopened = True
            # Re-connect on connection closure
            self.conn = event.container.connect(self.address)
            self.conn1 = event.container.connect(self.address)

    def on_link_opened(self, event):
        if event.sender:
            self.n_tx_attach += 1
            if event.sender.remote_source.address != self.node_addr:
                self.error = "Expected sender address '%s', got '%s'" % (self.node_addr, event.sender.remote_source.address)
                self.timer.cancel()
                self.conn.close()
        elif event.receiver:
            self.n_rx_attach += 1
            if event.receiver.remote_target.address != self.node_addr:
                self.error = "Expected receiver address '%s', got '%s'" % (self.node_addr, event.receiver.remote_target.address)
                self.timer.cancel()
                self.conn.close()

        if self.n_tx_attach == 2 and self.n_rx_attach == 2:
            self.conn.close()
            self.conn1.close()

        if self.n_tx_attach == 4 and self.n_rx_attach == 4:
            self.timer.cancel()
            self.conn.close()
            self.conn1.close()

    def run(self):
        Container(self).run()


class AutolinkAttachTest(MessagingHandler):
    def __init__(self, cid, address, node_addr):
        super(AutolinkAttachTest, self).__init__(prefetch=0)
        self.cid       = cid
        self.address   = address
        self.node_addr = node_addr
        self.error     = None
        self.sender    = None
        self.receiver  = None

        self.n_rx_attach = 0
        self.n_tx_attach = 0

    def timeout(self):
        self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach)
        self.conn.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn  = event.container.connect(self.address)

    def on_connection_closed(self, event):
        if self.n_tx_attach == 1:
            self.conn  = event.container.connect(self.address)

    def on_link_opened(self, event):
        if event.sender:
            self.n_tx_attach += 1
            if event.sender.remote_source.address != self.node_addr:
                self.error = "Expected sender address '%s', got '%s'" % (self.node_addr, event.sender.remote_source.address)
                self.timer.cancel()
                self.conn.close()
        elif event.receiver:
            self.n_rx_attach += 1
            if event.receiver.remote_target.address != self.node_addr:
                self.error = "Expected receiver address '%s', got '%s'" % (self.node_addr, event.receiver.remote_target.address)
                self.timer.cancel()
                self.conn.close()
        if self.n_tx_attach == 1 and self.n_rx_attach == 1:
            self.conn.close()
        if self.n_tx_attach == 2 and self.n_rx_attach == 2:
            self.conn.close()
            self.timer.cancel()

    def run(self):
        container = Container(self)
        container.container_id = self.cid
        container.run()


class AutolinkCreditTest(MessagingHandler):
    def __init__(self, normal_address, route_address):
        super(AutolinkCreditTest, self).__init__(prefetch=0)
        self.normal_address = normal_address
        self.route_address  = route_address
        self.dest           = 'node.1'
        self.normal_conn    = None
        self.route_conn     = None
        self.error          = None
        self.last_action    = "None"
        self.autolink_count_ok = False

    def timeout(self):
        self.error = "Timeout Expired: last_action=%s" % self.last_action
        if self.normal_conn:
            self.normal_conn.close()
        if self.route_conn:
            self.route_conn.close()

    def on_start(self, event):
        self.timer       = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.normal_conn = event.container.connect(self.normal_address)
        self.sender      = event.container.create_sender(self.normal_conn, self.dest)
        self.last_action = "Attached normal sender"

        local_node = Node.connect(self.normal_address, timeout=TIMEOUT)
        res = local_node.query(type='org.apache.qpid.dispatch.router')
        results = res.results[0]
        attribute_names = res.attribute_names
        if 8 == results[attribute_names.index('autoLinkCount')]:
            self.autolink_count_ok = True

    def on_link_opening(self, event):
        if event.sender:
            event.sender.source.address = event.sender.remote_source.address
        if event.receiver:
            event.receiver.target.address = event.receiver.remote_target.address

    def on_link_opened(self, event):
        if event.sender == self.sender:
            self.route_conn = event.container.connect(self.route_address)
            self.last_action = "Opened route connection"

    def on_sendable(self, event):
        if event.sender == self.sender:
            if self.last_action != "Opened route connection":
                self.error = "Events out of sequence:  last_action=%s" % self.last_action
            self.timer.cancel()
            self.route_conn.close()
            self.normal_conn.close()

    def run(self):
        container = Container(self)
        container.container_id = 'container.1'
        container.run()


class AutolinkSenderTest(MessagingHandler):
    def __init__(self, cid, normal_address, route_address, addr, ext_addr):
        super(AutolinkSenderTest, self).__init__()
        self.cid            = cid
        self.normal_address = normal_address
        self.route_address  = route_address
        self.dest           = addr
        self.ext_addr       = ext_addr
        self.count          = 275
        self.normal_conn    = None
        self.route_conn     = None
        self.error          = None
        self.last_action    = "None"
        self.n_sent         = 0
        self.n_received     = 0
        self.n_settled      = 0

    def timeout(self):
        self.error = "Timeout Expired: last_action=%s n_sent=%d n_received=%d n_settled=%d" % \
                     (self.last_action, self.n_sent, self.n_received, self.n_settled)
        if self.normal_conn:
            self.normal_conn.close()
        if self.route_conn:
            self.route_conn.close()

    def on_start(self, event):
        self.timer       = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.route_conn  = event.container.connect(self.route_address)
        self.last_action = "Connected route container"

    def on_link_opening(self, event):
        if event.sender:
            event.sender.source.address = event.sender.remote_source.address
        if event.receiver:
            event.receiver.target.address = event.receiver.remote_target.address

    def on_link_opened(self, event):
        if event.receiver and not self.normal_conn:
            self.normal_conn = event.container.connect(self.normal_address)
            self.sender      = event.container.create_sender(self.normal_conn, self.dest)
            self.last_action = "Attached normal sender"

    def on_sendable(self, event):
        if event.sender == self.sender:
            while self.n_sent < self.count and event.sender.credit > 0:
                msg = Message(body="AutoLinkTest")
                self.sender.send(msg)
                self.n_sent += 1

    def on_message(self, event):
        self.n_received += 1
        self.accept(event.delivery)

    def on_settled(self, event):
        self.n_settled += 1
        if self.n_settled == self.count:
            self.timer.cancel()
            self.normal_conn.close()
            self.route_conn.close()

    def run(self):
        container = Container(self)
        container.container_id = self.cid
        container.run()


class AutolinkReceiverTest(MessagingHandler):
    def __init__(self, cid, normal_address, route_address, addr, ext_addr):
        super(AutolinkReceiverTest, self).__init__()
        self.cid            = cid
        self.normal_address = normal_address
        self.route_address  = route_address
        self.dest           = addr
        self.ext_addr       = ext_addr
        self.count          = 275
        self.normal_conn    = None
        self.route_conn     = None
        self.error          = None
        self.last_action    = "None"
        self.n_sent         = 0
        self.n_received     = 0
        self.n_settled      = 0

    def timeout(self):
        self.error = "Timeout Expired: last_action=%s n_sent=%d n_received=%d n_settled=%d" % \
                     (self.last_action, self.n_sent, self.n_received, self.n_settled)
        if self.normal_conn:
            self.normal_conn.close()
        if self.route_conn:
            self.route_conn.close()

    def on_start(self, event):
        self.timer       = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.route_conn  = event.container.connect(self.route_address)
        self.last_action = "Connected route container"

    def on_link_opening(self, event):
        if event.sender:
            event.sender.source.address = event.sender.remote_source.address
            self.sender = event.sender
        if event.receiver:
            event.receiver.target.address = event.receiver.remote_target.address

    def on_link_opened(self, event):
        if event.sender and not self.normal_conn:
            self.normal_conn = event.container.connect(self.normal_address)
            self.receiver    = event.container.create_receiver(self.normal_conn, self.dest)
            self.last_action = "Attached normal receiver"

    def on_sendable(self, event):
        if event.sender == self.sender:
            while self.n_sent < self.count and event.sender.credit > 0:
                msg = Message(body="AutoLinkTest")
                self.sender.send(msg)
                self.n_sent += 1

    def on_message(self, event):
        self.n_received += 1
        self.accept(event.delivery)

    def on_settled(self, event):
        self.n_settled += 1
        if self.n_settled == self.count:
            self.timer.cancel()
            self.normal_conn.close()
            self.route_conn.close()

    def run(self):
        container = Container(self)
        container.container_id = self.cid
        container.run()


class AutolinkMultipleReceiverUsingMyListenerTest(MessagingHandler):
    def __init__(self, normal_address, route_address, addr):
        super(AutolinkMultipleReceiverUsingMyListenerTest, self).__init__()
        self.normal_address = normal_address
        self.route_address = route_address
        self.dest = addr
        self.count = 10
        self.normal_conn = None
        self.route_conn1 = None
        self.route_conn2 = None
        self.error = None
        self.last_action = "None"
        self.n_sent = 0
        self.rcv1_received = 0
        self.rcv2_received = 0
        self.n_settled = 0
        self.timer = None
        self.route_conn_rcv1 = None
        self.route_conn_rcv2 = None
        self.n_rx_attach1 = 0
        self.n_rx_attach2 = 0
        self.sender = None
        self.ready_to_send = False

    def timeout(self):
        self.error = "Timeout Expired: messages received by receiver 1=%d messages received by " \
                     "receiver 2=%d" % (self.rcv1_received, self.rcv2_received)
        self.normal_conn.close()
        self.route_conn1.close()
        self.route_conn2.close()

    def on_start(self, event):
        if self.count % 2 != 0:
            self.error = "Count must be a multiple of 2"
            return
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.normal_conn = event.container.connect(self.normal_address)
        self.route_conn1 = event.container.connect(self.route_address)
        self.route_conn2 = event.container.connect(self.route_address)
        self.route_conn_rcv1 = event.container.create_receiver(self.route_conn1, self.dest, name="R1")
        self.route_conn_rcv2 = event.container.create_receiver(self.route_conn2, self.dest, name="R2")

    def on_link_opened(self, event):
        if event.receiver == self.route_conn_rcv1:
            self.n_rx_attach1 += 1
        if event.receiver == self.route_conn_rcv2:
            self.n_rx_attach2 += 1

        if event.sender and event.sender == self.sender:
            self.ready_to_send = True

        if self.n_rx_attach1 == 1 and self.n_rx_attach2 == 1:
            # Both attaches have been received, create a sender
            if not self.sender:
                self.sender = event.container.create_sender(self.normal_conn, self.dest)

    def on_sendable(self, event):
        if self.ready_to_send:
            if self.n_sent < self.count:
                msg = Message(body="AutolinkMultipleReceiverUsingMyListenerTest")
                self.sender.send(msg)
                self.n_sent += 1

    def on_message(self, event):
        if event.receiver == self.route_conn_rcv1:
            self.rcv1_received += 1
        if event.receiver == self.route_conn_rcv2:
            self.rcv2_received += 1

        if (self.rcv1_received + self.rcv2_received == self.count) and \
                self.rcv2_received > 0 and self.rcv1_received > 0:
            self.timer.cancel()
            self.normal_conn.close()
            self.route_conn1.close()
            self.route_conn2.close()

    def run(self):
        container = Container(self)
        container.run()


class InterContainerTransferTest(MessagingHandler):
    def __init__(self, normal_address, route_address):
        super(InterContainerTransferTest, self).__init__()
        self.normal_address = normal_address
        self.route_address  = route_address
        self.count          = 275
        self.conn_1         = None
        self.conn_2         = None
        self.error          = None
        self.n_sent         = 0
        self.n_received     = 0
        self.n_settled      = 0

    def timeout(self):
        self.error = "Timeout Expired:  n_sent=%d n_received=%d n_settled=%d" % \
                     (self.n_sent, self.n_received, self.n_settled)
        self.conn_1.close()
        self.conn_2.close()

    def on_start(self, event):
        self.timer  = event.reactor.schedule(TIMEOUT, Timeout(self))
        event.container.container_id = 'container.2'
        self.conn_1 = event.container.connect(self.route_address)
        event.container.container_id = 'container.3'
        self.conn_2 = event.container.connect(self.route_address)

    def on_link_opening(self, event):
        if event.sender:
            event.sender.source.address = event.sender.remote_source.address
            self.sender = event.sender
        if event.receiver:
            event.receiver.target.address = event.receiver.remote_target.address

    def on_sendable(self, event):
        if event.sender == self.sender:
            while self.n_sent < self.count and event.sender.credit > 0:
                msg = Message(body="AutoLinkTest")
                self.sender.send(msg)
                self.n_sent += 1

    def on_message(self, event):
        self.n_received += 1
        self.accept(event.delivery)

    def on_settled(self, event):
        self.n_settled += 1
        if self.n_settled == self.count:
            self.timer.cancel()
            self.conn_1.close()
            self.conn_2.close()

    def run(self):
        container = Container(self)
        container.run()


class ManageAutolinksTest(MessagingHandler):
    def __init__(self, normal_address, route_address):
        super(ManageAutolinksTest, self).__init__()
        self.normal_address = normal_address
        self.route_address  = route_address
        self.count          = 5
        self.normal_conn    = None
        self.route_conn     = None
        self.error          = None
        self.n_created      = 0
        self.n_attached     = 0
        self.n_deleted      = 0
        self.n_detached     = 0

    def timeout(self):
        self.error = "Timeout Expired: n_created=%d n_attached=%d n_deleted=%d n_detached=%d" % \
                     (self.n_created, self.n_attached, self.n_deleted, self.n_detached)
        self.normal_conn.close()
        self.route_conn.close()

    def on_start(self, event):
        self.timer  = event.reactor.schedule(TIMEOUT, Timeout(self))
        event.container.container_id = 'container.new'
        self.route_conn  = event.container.connect(self.route_address)
        self.normal_conn = event.container.connect(self.normal_address)
        self.reply       = event.container.create_receiver(self.normal_conn, dynamic=True)
        self.agent       = event.container.create_sender(self.normal_conn, "$management");

    def on_link_opening(self, event):
        if event.sender:
            event.sender.source.address = event.sender.remote_source.address
        if event.receiver:
            event.receiver.target.address = event.receiver.remote_target.address

    def on_link_opened(self, event):
        if event.receiver == self.reply:
            self.reply_to = self.reply.remote_source.address
        if event.connection == self.route_conn:
            self.n_attached += 1
            if self.n_attached == self.count:
                self.send_ops()

    def on_link_remote_close(self, event):
        if event.link.remote_condition != None:
            self.error = "Received unexpected error on link-close: %s" % event.link.remote_condition.name
            self.timer.cancel()
            self.normal_conn.close()
            self.route_conn.close()

        self.n_detached += 1
        if self.n_detached == self.count:
            self.timer.cancel()
            self.normal_conn.close()
            self.route_conn.close()

    def send_ops(self):
        if self.n_created < self.count:
            while self.n_created < self.count and self.agent.credit > 0:
                props = {'operation': 'CREATE',
                         'type': 'org.apache.qpid.dispatch.router.config.autoLink',
                         'name': 'AL.%d' % self.n_created }
                body  = {'direction': 'out',
                         'containerId': 'container.new',
                         'address': 'node.%d' % self.n_created }
                msg = Message(properties=props, body=body, reply_to=self.reply_to)
                self.agent.send(msg)
                self.n_created += 1
        elif self.n_attached == self.count and self.n_deleted < self.count:
            while self.n_deleted < self.count and self.agent.credit > 0:
                props = {'operation': 'DELETE',
                         'type': 'org.apache.qpid.dispatch.router.config.autoLink',
                         'name': 'AL.%d' % self.n_deleted }
                body  = {}
                msg = Message(properties=props, body=body, reply_to=self.reply_to)
                self.agent.send(msg)
                self.n_deleted += 1

    def on_sendable(self, event):
        if event.sender == self.agent:
            self.send_ops()

    def on_message(self, event):
        if event.message.properties['statusCode'] // 100 != 2:
            self.error = 'Op Error: %d %s' % (event.message.properties['statusCode'],
                                              event.message.properties['statusDescription'])
            self.timer.cancel()
            self.normal_conn.close()
            self.route_conn.close()

    def run(self):
        container = Container(self)
        container.run()


if __name__ == '__main__':
    unittest.main(main_module())
