#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

from time import sleep, time
from threading import Event
from subprocess import PIPE, STDOUT
import socket

from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process, TestTimeout, \
    AsyncTestSender, AsyncTestReceiver, MgmtMsgProxy, unittest, QdManager
from test_broker import FakeBroker
from test_broker import FakeService

from proton import Delivery, symbol
from proton import Message, Condition
from proton.handlers import MessagingHandler
from proton.reactor import AtMostOnce, Container, DynamicNodeProperties, LinkOption, AtLeastOnce
from proton.reactor import ApplicationEvent
from proton.reactor import EventInjector
from proton.utils import BlockingConnection
from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler

from qpid_dispatch.management.client import Node
from qpid_dispatch.management.error import NotFoundStatus, BadRequestStatus


class LinkRouteTest(TestCase):
    """
    Tests the linkRoute property of the dispatch router.

    Sets up 4 routers (two of which are acting as brokers (QDR.A, QDR.D)). The other two routers have linkRoutes
    configured such that matching traffic will be directed to/from the 'fake' brokers.

    (please see configs in the setUpClass method to get a sense of how the routers and their connections are configured)
    The tests in this class send and receive messages across this network of routers to link routable addresses.
    Uses the Python Blocking API to send/receive messages. The blocking api plays neatly into the synchronous nature
    of system tests.

        QDR.A acting broker #1
             +---------+         +---------+         +---------+     +-----------------+
             |         | <------ |         | <-----  |         |<----| blocking_sender |
             |  QDR.A  |         |  QDR.B  |         |  QDR.C  |     +-----------------+
             |         | ------> |         | ------> |         |     +-------------------+
             +---------+         +---------+         +---------+---->| blocking_receiver |
                                    ^  |                             +-------------------+
                                    |  |
                                    |  V
                                 +---------+
                                 |         |
                                 |  QDR.D  |
                                 |         |
                                 +---------+
                            QDR.D acting broker #2

    """
    @classmethod
    def get_router(cls, index):
        return cls.routers[index]

    @classmethod
    def setUpClass(cls):
        """Start three routers"""
        super(LinkRouteTest, cls).setUpClass()

        def router(name, connection):

            config = [
                ('router', {'mode': 'interior', 'id': 'QDR.%s' % name}),
            ] + connection

            config = Qdrouterd.Config(config)
            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))

        cls.routers = []
        a_listener_port = cls.tester.get_port()
        b_listener_port = cls.tester.get_port()
        c_listener_port = cls.tester.get_port()
        d_listener_port = cls.tester.get_port()
        test_tag_listener_port = cls.tester.get_port()

        router('A',
               [
                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
               ])
        router('B',
               [
                   # Listener for clients, note that the tests assume this listener is first in this list:
                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
                   ('listener', {'name': 'test-tag', 'role': 'route-container', 'host': '0.0.0.0', 'port': test_tag_listener_port, 'saslMechanisms': 'ANONYMOUS'}),

                   # This is an route-container connection made from QDR.B's ephemeral port to a_listener_port
                   ('connector', {'name': 'broker', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
                   # Only inter router communication must happen on 'inter-router' connectors. This connector makes
                   # a connection from the router B's ephemeral port to c_listener_port
                   ('connector', {'name': 'routerC', 'role': 'inter-router', 'host': '0.0.0.0', 'port': c_listener_port}),
                   # This is an on-demand connection made from QDR.B's ephemeral port to d_listener_port
                   ('connector', {'name': 'routerD', 'role': 'route-container', 'host': '0.0.0.0', 'port': d_listener_port, 'saslMechanisms': 'ANONYMOUS'}),

                   #('linkRoute', {'prefix': 'org.apache', 'connection': 'broker', 'direction': 'in'}),
                   ('linkRoute', {'prefix': 'org.apache', 'containerId': 'QDR.A', 'direction': 'in'}),
                   ('linkRoute', {'prefix': 'org.apache', 'containerId': 'QDR.A', 'direction': 'out'}),

                   ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'direction': 'in'}),
                   ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'direction': 'out'}),

                   # addresses matching pattern 'a.*.toA.#' route to QDR.A
                   ('linkRoute', {'pattern': 'a.*.toA.#', 'containerId': 'QDR.A', 'direction': 'in'}),
                   ('linkRoute', {'pattern': 'a.*.toA.#', 'containerId': 'QDR.A', 'direction': 'out'}),

                   # addresses matching pattern 'a.*.toD.#' route to QDR.D
                   # Dont change dir to direction here so we can make sure that the dir attribute is still working.
                   ('linkRoute', {'pattern': 'a.*.toD.#', 'containerId': 'QDR.D', 'dir': 'in'}),
                   ('linkRoute', {'pattern': 'a.*.toD.#', 'containerId': 'QDR.D', 'dir': 'out'})

               ]
               )
        router('C',
               [
                   # The client will exclusively use the following listener to
                   # connect to QDR.C, the tests assume this is the first entry
                   # in the list
                   ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}),
                   ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': c_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
                   # The dot(.) at the end is ignored by the address hashing scheme.

                   ('linkRoute', {'prefix': 'org.apache.', 'direction': 'in'}),
                   ('linkRoute', {'prefix': 'org.apache.', 'direction': 'out'}),

                   ('linkRoute', {'prefix': 'pulp.task', 'direction': 'in'}),
                   ('linkRoute', {'prefix': 'pulp.task', 'direction': 'out'}),

                   ('linkRoute', {'pattern': 'a.*.toA.#', 'direction': 'in'}),
                   ('linkRoute', {'pattern': 'a.*.toA.#', 'direction': 'out'}),

                   ('linkRoute', {'pattern': 'a.*.toD.#', 'direction': 'in'}),
                   ('linkRoute', {'pattern': 'a.*.toD.#', 'direction': 'out'})

               ]
               )
        router('D',  # sink for QDR.D routes
               [
                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': d_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
               ])

        # Wait for the routers to locate each other, and for route propagation
        # to settle
        cls.routers[1].wait_router_connected('QDR.C')
        cls.routers[2].wait_router_connected('QDR.B')
        cls.routers[2].wait_address("org.apache", remotes=1, delay=0.5, count=2)

        # This is not a classic router network in the sense that QDR.A and D are acting as brokers. We allow a little
        # bit more time for the routers to stabilize.
        sleep(2)

    def run_qdstat_linkRoute(self, address, args=None):
        cmd = ['qdstat', '--bus', str(address), '--timeout', str(TIMEOUT)] + ['--linkroute']
        if args:
            cmd = cmd + args
        p = self.popen(
            cmd,
            name='qdstat-' + self.id(), stdout=PIPE, expect=None,
            universal_newlines=True)

        out = p.communicate()[0]
        assert p.returncode == 0, "qdstat exit status %s, output:\n%s" % (p.returncode, out)
        return out

    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
        p = self.popen(
            ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
            universal_newlines=True)
        out = p.communicate(input)[0]
        try:
            p.teardown()
        except Exception as e:
            raise Exception("%s\n%s" % (e, out))
        return out

    def test_aaa_qdmanage_query_link_route(self):
        """
        qdmanage converts short type to long type and this test specifically tests if qdmanage is actually doing
        the type conversion correctly by querying with short type and long type.
        """
        cmd = 'QUERY --type=linkRoute'
        out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])

        # Make sure there is a dir of in and out.
        self.assertIn('"direction": "in"', out)
        self.assertIn('"direction": "out"', out)
        self.assertIn('"containerId": "QDR.A"', out)

        # Use the long type and make sure that qdmanage does not mess up the long type
        cmd = 'QUERY --type=org.apache.qpid.dispatch.router.config.linkRoute'
        out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])

        # Make sure there is a dir of in and out.
        self.assertIn('"direction": "in"', out)
        self.assertIn('"direction": "out"', out)
        self.assertIn('"containerId": "QDR.A"', out)

        identity = out[out.find("identity") + 12: out.find("identity") + 13]
        cmd = 'READ --type=linkRoute --identity=' + identity
        out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
        self.assertIn(identity, out)

        exception_occurred = False
        try:
            # This identity should not be found
            cmd = 'READ --type=linkRoute --identity=9999'
            out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
        except Exception as e:
            exception_occurred = True
            self.assertIn("NotFoundStatus: Not Found", str(e))

        self.assertTrue(exception_occurred)

        exception_occurred = False
        try:
            # There is no identity specified, this is a bad request
            cmd = 'READ --type=linkRoute'
            out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
        except Exception as e:
            exception_occurred = True
            self.assertIn("BadRequestStatus: No name or identity provided", str(e))

        self.assertTrue(exception_occurred)

        cmd = 'CREATE --type=autoLink address=127.0.0.1 direction=in connection=routerC'
        out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])

        identity = out[out.find("identity") + 12: out.find("identity") + 14]
        cmd = 'READ --type=autoLink --identity=' + identity
        out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
        self.assertIn(identity, out)

    def test_bbb_qdstat_link_routes_routerB(self):
        """
        Runs qdstat on router B to make sure that router B has 4 link routes,
        each having one 'in' and one 'out' entry

        """
        out = self.run_qdstat_linkRoute(self.routers[1].addresses[0])
        for route in ['a.*.toA.#', 'a.*.toD.#', 'org.apache', 'pulp.task']:
            self.assertIn(route, out)

        out_list = out.split()
        self.assertEqual(out_list.count('in'), 4)
        self.assertEqual(out_list.count('out'), 4)

        parts = out.split("\n")
        self.assertEqual(len(parts), 15)

        out = self.run_qdstat_linkRoute(self.routers[1].addresses[0], args=['--limit=1'])
        parts = out.split("\n")
        self.assertEqual(len(parts), 8)

    def test_ccc_qdstat_link_routes_routerC(self):
        """
        Runs qdstat on router C to make sure that router C has 4 link routes,
        each having one 'in' and one 'out' entry

        """
        out = self.run_qdstat_linkRoute(self.routers[2].addresses[0])
        out_list = out.split()

        self.assertEqual(out_list.count('in'), 4)
        self.assertEqual(out_list.count('out'), 4)

    def test_ddd_partial_link_route_match(self):
        """
        The linkRoute on Routers C and B is set to org.apache.
        Creates a receiver listening on the address 'org.apache.dev' and a sender that sends to address 'org.apache.dev'.
        Sends a message to org.apache.dev via router QDR.C and makes sure that the message was successfully
        routed (using partial address matching) and received using pre-created links that were created as a
        result of specifying addresses in the linkRoute attribute('org.apache.').
        """
        hello_world_1 = "Hello World_1!"

        # Connects to listener #2 on QDR.C
        addr = self.routers[2].addresses[0]

        blocking_connection = BlockingConnection(addr)

        # Receive on org.apache.dev
        blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev")

        apply_options = AtMostOnce()

        # Sender to org.apache.dev
        blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options)
        msg = Message(body=hello_world_1)
        # Send a message
        blocking_sender.send(msg)

        received_message = blocking_receiver.receive()

        self.assertEqual(hello_world_1, received_message.body)

        # Connect to the router acting like the broker (QDR.A) and check the deliveriesIngress and deliveriesEgress
        local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)

        self.assertEqual(u'QDR.A', local_node.query(type='org.apache.qpid.dispatch.router',
                                                    attribute_names=[u'id']).results[0][0])

        self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
                                            name='M0org.apache.dev').deliveriesEgress)
        self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
                                            name='M0org.apache.dev').deliveriesIngress)

        # There should be 4 links -
        # 1. outbound receiver link on org.apache.dev
        # 2. inbound sender link on blocking_sender
        # 3. inbound link to the $management
        # 4. outbound link to $management
        # self.assertEqual(4, len()
        self.assertEqual(4, len(local_node.query(type='org.apache.qpid.dispatch.router.link').results))

        blocking_connection.close()

    def test_partial_link_route_match_1(self):
        """
        This test is pretty much the same as the previous test (test_partial_link_route_match) but the connection is
        made to router QDR.B instead of QDR.C and we expect to see the same behavior.
        """
        hello_world_2 = "Hello World_2!"
        addr = self.routers[1].addresses[0]

        blocking_connection = BlockingConnection(addr)

        # Receive on org.apache.dev
        blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev.1")

        apply_options = AtMostOnce()

        # Sender to  to org.apache.dev
        blocking_sender = blocking_connection.create_sender(address="org.apache.dev.1", options=apply_options)
        msg = Message(body=hello_world_2)
        # Send a message
        blocking_sender.send(msg)

        received_message = blocking_receiver.receive()

        self.assertEqual(hello_world_2, received_message.body)

        local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)

        # Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms
        # that the message was link routed
        self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
                                            name='M0org.apache.dev.1').deliveriesEgress)

        self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
                                            name='M0org.apache.dev.1').deliveriesIngress)

        blocking_connection.close()

    def test_full_link_route_match(self):
        """
        The linkRoute on Routers C and B is set to org.apache.
        Creates a receiver listening on the address 'org.apache' and a sender that sends to address 'org.apache'.
        Sends a message to org.apache via router QDR.C and makes sure that the message was successfully
        routed (using full address matching) and received using pre-created links that were created as a
        result of specifying addresses in the linkRoute attribute('org.apache.').
        """
        hello_world_3 = "Hello World_3!"
        # Connects to listener #2 on QDR.C
        addr = self.routers[2].addresses[0]

        blocking_connection = BlockingConnection(addr)

        # Receive on org.apache
        blocking_receiver = blocking_connection.create_receiver(address="org.apache")

        apply_options = AtMostOnce()

        # Sender to  to org.apache
        blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options)
        msg = Message(body=hello_world_3)
        # Send a message
        blocking_sender.send(msg)

        received_message = blocking_receiver.receive()

        self.assertEqual(hello_world_3, received_message.body)

        local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)

        # Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms
        # that the message was link routed
        self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
                                            name='M0org.apache').deliveriesEgress)

        self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
                                            name='M0org.apache').deliveriesIngress)

        blocking_connection.close()

    def _link_route_pattern_match(self, connect_node, include_host,
                                  exclude_host, test_address,
                                  expected_pattern):
        """
        This helper function ensures that messages sent to 'test_address' pass
        through 'include_host', and are *not* routed to 'exclude_host'

        """
        hello_pattern = "Hello Pattern!"
        route = 'M0' + test_address

        # Connect to the two 'waypoints', ensure the route is not present on
        # either

        node_A = Node.connect(include_host, timeout=TIMEOUT)
        node_B = Node.connect(exclude_host, timeout=TIMEOUT)

        for node in [node_A, node_B]:
            self.assertRaises(NotFoundStatus,
                              node.read,
                              type='org.apache.qpid.dispatch.router.address',
                              name=route)

        # wait until the host we're connecting to gets its next hop for the
        # pattern we're connecting to
        connect_node.wait_address(expected_pattern, remotes=1, delay=0.1, count=2)

        # Connect to 'connect_node' and send message to 'address'

        blocking_connection = BlockingConnection(connect_node.addresses[0])
        blocking_receiver = blocking_connection.create_receiver(address=test_address)
        blocking_sender = blocking_connection.create_sender(address=test_address,
                                                            options=AtMostOnce())
        msg = Message(body=hello_pattern)
        blocking_sender.send(msg)
        received_message = blocking_receiver.receive()
        self.assertEqual(hello_pattern, received_message.body)

        # verify test_address is only present on include_host and not on exclude_host

        self.assertRaises(NotFoundStatus,
                          node_B.read,
                          type='org.apache.qpid.dispatch.router.address',
                          name=route)

        self.assertEqual(1, node_A.read(type='org.apache.qpid.dispatch.router.address',
                                        name=route).deliveriesIngress)
        self.assertEqual(1, node_A.read(type='org.apache.qpid.dispatch.router.address',
                                        name=route).deliveriesIngress)

        # drop the connection and verify that test_address is no longer on include_host

        blocking_connection.close()

        timeout = time() + TIMEOUT
        while True:
            try:
                node_A.read(type='org.apache.qpid.dispatch.router.address',
                            name=route)
                if time() > timeout:
                    raise Exception("Expected route '%s' to expire!" % route)
                sleep(0.1)
            except NotFoundStatus:
                break

        node_A.close()
        node_B.close()

    def test_link_route_pattern_match(self):
        """
        Verify the addresses match the proper patterns and are routed to the
        proper 'waypoint' only
        """
        qdr_A = self.routers[0].addresses[0]
        qdr_D = self.routers[3].addresses[0]
        qdr_C = self.routers[2]  # note: the node, not the address!

        self._link_route_pattern_match(connect_node=qdr_C,
                                       include_host=qdr_A,
                                       exclude_host=qdr_D,
                                       test_address='a.notD.toA',
                                       expected_pattern='a.*.toA.#')
        self._link_route_pattern_match(connect_node=qdr_C,
                                       include_host=qdr_D,
                                       exclude_host=qdr_A,
                                       test_address='a.notA.toD',
                                       expected_pattern='a.*.toD.#')
        self._link_route_pattern_match(connect_node=qdr_C,
                                       include_host=qdr_A,
                                       exclude_host=qdr_D,
                                       test_address='a.toD.toA.xyz',
                                       expected_pattern='a.*.toA.#')
        self._link_route_pattern_match(connect_node=qdr_C,
                                       include_host=qdr_D,
                                       exclude_host=qdr_A,
                                       test_address='a.toA.toD.abc',
                                       expected_pattern='a.*.toD.#')

    def test_custom_annotations_match(self):
        """
        The linkRoute on Routers C and B is set to org.apache.
        Creates a receiver listening on the address 'org.apache' and a sender that sends to address 'org.apache'.
        Sends a message with custom annotations to org.apache via router QDR.C and makes sure that the message was successfully
        routed (using full address matching) and received using pre-created links that were created as a
        result of specifying addresses in the linkRoute attribute('org.apache.'). Make sure custom annotations arrived as well.
        """
        hello_world_3 = "Hello World_3!"
        # Connects to listener #2 on QDR.C
        addr = self.routers[2].addresses[0]

        blocking_connection = BlockingConnection(addr)

        # Receive on org.apache
        blocking_receiver = blocking_connection.create_receiver(address="org.apache.2")

        apply_options = AtMostOnce()

        # Sender to  to org.apache
        blocking_sender = blocking_connection.create_sender(address="org.apache.2", options=apply_options)
        msg = Message(body=hello_world_3)
        annotations = {'custom-annotation': '1/Custom_Annotation'}
        msg.annotations = annotations

        # Send a message
        blocking_sender.send(msg)

        received_message = blocking_receiver.receive()

        self.assertEqual(hello_world_3, received_message.body)
        self.assertEqual(received_message.annotations, annotations)

        blocking_connection.close()

    def test_full_link_route_match_1(self):
        """
        This test is pretty much the same as the previous test (test_full_link_route_match) but the connection is
        made to router QDR.B instead of QDR.C and we expect the message to be link routed successfully.
        """
        hello_world_4 = "Hello World_4!"
        addr = self.routers[1].addresses[0]

        blocking_connection = BlockingConnection(addr)

        # Receive on org.apache
        blocking_receiver = blocking_connection.create_receiver(address="org.apache.1")

        apply_options = AtMostOnce()

        # Sender to  to org.apache
        blocking_sender = blocking_connection.create_sender(address="org.apache.1", options=apply_options)

        msg = Message(body=hello_world_4)
        # Send a message
        blocking_sender.send(msg)

        received_message = blocking_receiver.receive()

        self.assertEqual(hello_world_4, received_message.body)

        local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)

        # Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms
        # that the message was link routed
        self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
                                            name='M0org.apache.1').deliveriesEgress)

        self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address',
                                            name='M0org.apache.1').deliveriesIngress)

        blocking_connection.close()

    def test_zzz_qdmanage_delete_link_route(self):
        """
        We are deleting the link route using qdmanage short name. This should be the last test to run
        """

        local_node = Node.connect(self.routers[1].addresses[0], timeout=TIMEOUT)
        res = local_node.query(type='org.apache.qpid.dispatch.router')
        results = res.results[0]
        attribute_list = res.attribute_names

        result_list = local_node.query(type='org.apache.qpid.dispatch.router.config.linkRoute').results
        self.assertEqual(results[attribute_list.index('linkRouteCount')], len(result_list))

        # First delete linkRoutes on QDR.B
        for rid in range(8):
            cmd = 'DELETE --type=linkRoute --identity=' + result_list[rid][1]
            self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])

        cmd = 'QUERY --type=linkRoute'
        out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
        self.assertEqual(out.rstrip(), '[]')

        # linkRoutes now gone on QDR.B but remember that it still exist on QDR.C
        # We will now try to create a receiver on address org.apache.dev on QDR.C.
        # Since the linkRoute on QDR.B is gone, QDR.C
        # will not allow a receiver to be created since there is no route to destination.

        # Connects to listener #2 on QDR.C
        addr = self.routers[2].addresses[0]

        # Now delete linkRoutes on QDR.C to eradicate linkRoutes completely
        local_node = Node.connect(addr, timeout=TIMEOUT)
        result_list = local_node.query(type='org.apache.qpid.dispatch.router.config.linkRoute').results

        # QDR.C has 8 link routes configured, nuke 'em:
        self.assertEqual(8, len(result_list))
        for rid in range(8):
            cmd = 'DELETE --type=linkRoute --identity=' + result_list[rid][1]
            self.run_qdmanage(cmd=cmd, address=addr)

        cmd = 'QUERY --type=linkRoute'
        out = self.run_qdmanage(cmd=cmd, address=addr)
        self.assertEqual(out.rstrip(), '[]')

        res = local_node.query(type='org.apache.qpid.dispatch.router')
        results = res.results[0]
        attribute_list = res.attribute_names
        self.assertEqual(results[attribute_list.index('linkRouteCount')], 0)

        blocking_connection = BlockingConnection(addr, timeout=3)

        # Receive on org.apache.dev (this address used to be linkRouted but not anymore since we deleted linkRoutes
        # on both QDR.C and QDR.B)
        blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev")

        apply_options = AtMostOnce()
        hello_world_1 = "Hello World_1!"
        # Sender to org.apache.dev
        blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options)
        msg = Message(body=hello_world_1)

        # Send a message
        blocking_sender.send(msg)
        received_message = blocking_receiver.receive(timeout=5)
        self.assertEqual(hello_world_1, received_message.body)

    def test_yyy_delivery_tag(self):
        """
        Tests that the router carries over the delivery tag on a link routed delivery
        """
        listening_address = self.routers[1].addresses[1]
        sender_address = self.routers[2].addresses[0]
        qdstat_address = self.routers[2].addresses[0]
        test = DeliveryTagsTest(sender_address, listening_address, qdstat_address)
        test.run()
        self.assertEqual(None, test.error)

    def test_yyy_invalid_delivery_tag(self):
        test = InvalidTagTest(self.routers[2].addresses[0])
        test.run()
        self.assertEqual(None, test.error)

    def test_close_with_unsettled(self):
        test = CloseWithUnsettledTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
        test.run()
        self.assertEqual(None, test.error)

    def test_www_drain_support_all_messages(self):
        drain_support = DrainMessagesHandler(self.routers[2].addresses[0])
        drain_support.run()
        self.assertEqual(None, drain_support.error)

    def test_www_drain_support_one_message(self):
        drain_support = DrainOneMessageHandler(self.routers[2].addresses[0])
        drain_support.run()
        self.assertEqual(None, drain_support.error)

    def test_www_drain_support_no_messages(self):
        drain_support = DrainNoMessagesHandler(self.routers[2].addresses[0])
        drain_support.run()
        self.assertEqual(None, drain_support.error)

    def test_www_drain_support_no_more_messages(self):
        drain_support = DrainNoMoreMessagesHandler(self.routers[2].addresses[0])
        drain_support.run()
        self.assertEqual(None, drain_support.error)

    def test_link_route_terminus_address(self):
        # The receiver is attaching to router B to a listener that has link route for address 'pulp.task' setup.
        listening_address = self.routers[1].addresses[1]
        # Run the query on a normal port
        query_address_listening = self.routers[1].addresses[0]

        # Sender is attaching to router C
        sender_address = self.routers[2].addresses[0]
        query_address_sending = self.routers[2].addresses[0]

        test = TerminusAddrTest(sender_address, listening_address, query_address_sending, query_address_listening)
        test.run()

        self.assertTrue(test.in_receiver_found)
        self.assertTrue(test.out_receiver_found)
        self.assertTrue(test.in_sender_found)
        self.assertTrue(test.out_sender_found)

    def test_dynamic_source(self):
        test = DynamicSourceTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
        test.run()
        self.assertEqual(None, test.error)

    def test_dynamic_target(self):
        test = DynamicTargetTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
        test.run()
        self.assertEqual(None, test.error)

    def test_detach_without_close(self):
        test = DetachNoCloseTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
        test.run()
        self.assertEqual(None, test.error)

    def test_detach_mixed_close(self):
        test = DetachMixedCloseTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
        test.run()
        self.assertEqual(None, test.error)

    def _multi_link_send_receive(self, send_host, receive_host, name):
        senders = ["%s/%s" % (send_host, address) for address in ["org.apache.foo", "org.apache.bar"]]
        receivers = ["%s/%s" % (receive_host, address) for address in ["org.apache.foo", "org.apache.bar"]]
        test = MultiLinkSendReceive(senders, receivers, name)
        test.run()
        self.assertEqual(None, test.error)

    def test_same_name_route_receivers_through_B(self):
        self._multi_link_send_receive(self.routers[0].addresses[0], self.routers[1].addresses[0], "recv_through_B")

    def test_same_name_route_senders_through_B(self):
        self._multi_link_send_receive(self.routers[1].addresses[0], self.routers[0].addresses[0], "send_through_B")

    def test_same_name_route_receivers_through_C(self):
        self._multi_link_send_receive(self.routers[0].addresses[0], self.routers[2].addresses[0], "recv_through_C")

    def test_same_name_route_senders_through_C(self):
        self._multi_link_send_receive(self.routers[2].addresses[0], self.routers[0].addresses[0], "send_through_C")

    def test_echo_detach_received(self):
        """
        Create two receivers to link routed address org.apache.dev
        Create a sender to the same address that the receiver is listening on and send 100 messages.
        After the receivers receive 10 messages each, the receivers will detach and expect to receive ten
        detaches in response.

        """
        test = EchoDetachReceived(self.routers[2].addresses[0], self.routers[2].addresses[0])
        test.run()
        self.assertEqual(None, test.error)

    def test_bad_link_route_config(self):
        """
        What happens when the link route create request is malformed?
        """
        mgmt = self.routers[1].management

        # zero length prefix
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-1",
                          attributes={'prefix': '',
                                      'containerId': 'FakeBroker',
                                      'direction': 'in'})
        # pattern wrong type
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-2",
                          attributes={'pattern': 666,
                                      'containerId': 'FakeBroker',
                                      'direction': 'in'})
        # invalid pattern (no tokens)
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-3",
                          attributes={'pattern': '///',
                                      'containerId': 'FakeBroker',
                                      'direction': 'in'})
        # empty attributes
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-4",
                          attributes={})

        # both pattern and prefix
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-5",
                          attributes={'prefix': 'a1',
                                      'pattern': 'b2',
                                      'containerId': 'FakeBroker',
                                      'direction': 'in'})
        # bad direction
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-6",
                          attributes={'pattern': 'b2',
                                      'containerId': 'FakeBroker',
                                      'direction': 'nowhere'})
        # bad distribution
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-7",
                          attributes={'pattern': 'b2',
                                      'containerId': 'FakeBroker',
                                      'direction': 'in',
                                      "distribution": "dilly dilly"})

        # no direction
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-8",
                          attributes={'prefix': 'b2',
                                      'containerId': 'FakeBroker'})

        # neither pattern nor prefix
        self.assertRaises(BadRequestStatus,
                          mgmt.create,
                          type="org.apache.qpid.dispatch.router.config.linkRoute",
                          name="bad-9",
                          attributes={'direction': 'out',
                                      'containerId': 'FakeBroker'})


class DeliveryTagsTest(MessagingHandler):
    def __init__(self, sender_address, listening_address, qdstat_address):
        super(DeliveryTagsTest, self).__init__()
        self.sender_address = sender_address
        self.listening_address = listening_address
        self.sender = None
        self.receiver_connection = None
        self.sender_connection = None
        self.qdstat_address = qdstat_address
        self.id = '1235'
        self.times = 1
        self.sent = 0
        self.rcvd = 0
        self.delivery_tag_verified = False
        # The delivery tag we are going to send in the transfer frame
        # We will later make sure that the same delivery tag shows up on the receiving end in the link routed case.
        # KAG: force the literal to type 'str' due to SWIG weirdness: on 2.X a
        # delivery tag cannot be unicode (must be binary), but on 3.X it must
        # be unicode!  See https://issues.apache.org/jira/browse/PROTON-1843
        self.delivery_tag = str('92319')
        self.error = None

    def timeout(self):
        self.error = "Timeout expired: sent=%d rcvd=%d" % (self.sent, self.rcvd)
        if self.receiver_connection:
            self.receiver_connection.close()
        if self.sender_connection:
            self.sender_connection.close()

    def on_start(self, event):
        self.timer               = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.receiver_connection = event.container.connect(self.listening_address)

    def on_connection_remote_open(self, event):
        if event.connection == self.receiver_connection:
            continue_loop = True
            # Don't open the sender connection unless we can make sure that there is a remote receiver ready to
            # accept the message.
            # If there is no remote receiver, the router will throw a 'No route to destination' error when
            # creating sender connection.
            # The following loops introduces a wait before creating the sender connection. It gives time to the
            # router so that the address Dpulp.task can show up on the remoteCount
            i = 0
            while continue_loop:
                if i > 100:  # If we have run the read command for more than hundred times and we still do not have
                    # the remoteCount set to 1, there is a problem, just exit out of the function instead
                    # of looping to infinity.
                    self.receiver_connection.close()
                    return
                local_node = Node.connect(self.qdstat_address, timeout=TIMEOUT)
                out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount
                if out == 1:
                    continue_loop = False
                else:
                    i += 1
                    sleep(0.25)

            self.sender_connection = event.container.connect(self.sender_address)
            self.sender = event.container.create_sender(self.sender_connection, "pulp.task", options=AtMostOnce())

    def on_sendable(self, event):
        if self.times == 1:
            msg = Message(body="Hello World")
            self.sender.send(msg, tag=self.delivery_tag)
            self.times += 1
            self.sent += 1

    def on_message(self, event):
        if "Hello World" == event.message.body:
            self.rcvd += 1

        # If the tag on the delivery is the same as the tag we sent with the initial transfer, it means
        # that the router has propagated the delivery tag successfully because of link routing.
        if self.delivery_tag != event.delivery.tag:
            self.error = "Delivery-tag: expected:%r got:%r" % (self.delivery_tag, event.delivery.tag)
        self.receiver_connection.close()
        self.sender_connection.close()
        self.timer.cancel()

    def run(self):
        Container(self).run()


class CloseWithUnsettledTest(MessagingHandler):
    ##
    # This test sends a message across an attach-routed link.  While the message
    # is unsettled, the client link is closed.  The test is ensuring that the
    # router does not crash during the closing of the links.
    ##
    def __init__(self, normal_addr, route_addr):
        super(CloseWithUnsettledTest, self).__init__(prefetch=0, auto_accept=False)
        self.normal_addr = normal_addr
        self.route_addr  = route_addr
        self.dest = "pulp.task.CWUtest"
        self.error = None

    def timeout(self):
        self.error = "Timeout Expired - Check for cores"
        self.conn_normal.close()
        self.conn_route.close()

    def on_start(self, event):
        self.timer      = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.conn_route = event.container.connect(self.route_addr)

    def on_connection_opened(self, event):
        if event.connection == self.conn_route:
            self.conn_normal = event.container.connect(self.normal_addr)
        elif event.connection == self.conn_normal:
            self.sender = event.container.create_sender(self.conn_normal, self.dest)

    def on_connection_closed(self, event):
        self.conn_route.close()
        self.timer.cancel()

    def on_link_opened(self, event):
        if event.receiver:
            self.receiver = event.receiver
            self.receiver.flow(1)

    def on_sendable(self, event):
        msg = Message(body="CloseWithUnsettled")
        event.sender.send(msg)

    def on_message(self, event):
        self.conn_normal.close()

    def run(self):
        Container(self).run()


class DynamicSourceTest(MessagingHandler):
    ##
    # This test verifies that a dynamic source can be propagated via link-route to
    # a route-container.
    ##
    def __init__(self, normal_addr, route_addr):
        super(DynamicSourceTest, self).__init__(prefetch=0, auto_accept=False)
        self.normal_addr = normal_addr
        self.route_addr  = route_addr
        self.dest = "pulp.task.DynamicSource"
        self.address = "DynamicSourceAddress"
        self.error = None

    def timeout(self):
        self.error = "Timeout Expired - Check for cores"
        self.conn_normal.close()
        self.conn_route.close()

    def on_start(self, event):
        self.timer      = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.conn_route = event.container.connect(self.route_addr)

    def on_connection_opened(self, event):
        if event.connection == self.conn_route:
            self.conn_normal = event.container.connect(self.normal_addr)
        elif event.connection == self.conn_normal:
            self.receiver = event.container.create_receiver(self.conn_normal, None, dynamic=True, options=DynamicNodeProperties({"x-opt-qd.address": u"pulp.task.abc"}))

    def on_link_opened(self, event):
        if event.receiver == self.receiver:
            if self.receiver.remote_source.address != self.address:
                self.error = "Expected %s, got %s" % (self.address, self.receiver.remote_source.address)
            self.conn_normal.close()
            self.conn_route.close()
            self.timer.cancel()

    def on_link_opening(self, event):
        if event.sender:
            self.sender = event.sender
            if not self.sender.remote_source.dynamic:
                self.error = "Expected sender with dynamic source"
                self.conn_normal.close()
                self.conn_route.close()
                self.timer.cancel()
            self.sender.source.address = self.address
            self.sender.open()

    def run(self):
        Container(self).run()


class DynamicTarget(LinkOption):
    def apply(self, link):
        link.target.dynamic = True
        link.target.address = None


class DynamicTargetTest(MessagingHandler):
    ##
    # This test verifies that a dynamic source can be propagated via link-route to
    # a route-container.
    ##
    def __init__(self, normal_addr, route_addr):
        super(DynamicTargetTest, self).__init__(prefetch=0, auto_accept=False)
        self.normal_addr = normal_addr
        self.route_addr  = route_addr
        self.dest = "pulp.task.DynamicTarget"
        self.address = "DynamicTargetAddress"
        self.error = None

    def timeout(self):
        self.error = "Timeout Expired - Check for cores"
        self.conn_normal.close()
        self.conn_route.close()

    def on_start(self, event):
        self.timer      = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.conn_route = event.container.connect(self.route_addr)

    def on_connection_opened(self, event):
        if event.connection == self.conn_route:
            self.conn_normal = event.container.connect(self.normal_addr)
        elif event.connection == self.conn_normal:
            self.sender = event.container.create_sender(self.conn_normal, None, options=[DynamicTarget(), DynamicNodeProperties({"x-opt-qd.address": u"pulp.task.abc"})])

    def on_link_opened(self, event):
        if event.sender == self.sender:
            if self.sender.remote_target.address != self.address:
                self.error = "Expected %s, got %s" % (self.address, self.receiver.remote_source.address)
            self.conn_normal.close()
            self.conn_route.close()
            self.timer.cancel()

    def on_link_opening(self, event):
        if event.receiver:
            self.receiver = event.receiver
            if not self.receiver.remote_target.dynamic:
                self.error = "Expected receiver with dynamic source"
                self.conn_normal.close()
                self.conn_route.close()
                self.timer.cancel()
            self.receiver.target.address = self.address
            self.receiver.open()

    def run(self):
        Container(self).run()


class DetachNoCloseTest(MessagingHandler):
    ##
    # This test verifies that link-detach (not close) is propagated properly
    ##
    def __init__(self, normal_addr, route_addr):
        super(DetachNoCloseTest, self).__init__(prefetch=0, auto_accept=False)
        self.normal_addr = normal_addr
        self.route_addr  = route_addr
        self.dest = "pulp.task.DetachNoClose"
        self.error = None

    def timeout(self):
        self.error = "Timeout Expired - Check for cores"
        self.conn_normal.close()
        self.conn_route.close()

    def stop(self):
        self.conn_normal.close()
        self.conn_route.close()
        self.timer.cancel()

    def on_start(self, event):
        self.timer      = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.conn_route = event.container.connect(self.route_addr)

    def on_connection_opened(self, event):
        if event.connection == self.conn_route:
            self.conn_normal = event.container.connect(self.normal_addr)
        elif event.connection == self.conn_normal:
            self.receiver = event.container.create_receiver(self.conn_normal, self.dest)

    def on_link_opened(self, event):
        if event.receiver == self.receiver:
            self.receiver.detach()

    def on_link_remote_detach(self, event):
        if event.sender == self.sender:
            self.sender.detach()
        if event.receiver == self.receiver:
            ##
            # Test passed, we expected a detach on the propagated sender and back
            ##
            self.stop()

    def on_link_closing(self, event):
        if event.sender == self.sender:
            self.error = 'Propagated link was closed.  Expected it to be detached'
            self.stop()

        if event.receiver == self.receiver:
            self.error = 'Client link was closed.  Expected it to be detached'
            self.stop()

    def on_link_opening(self, event):
        if event.sender:
            self.sender = event.sender
            self.sender.source.address = self.sender.remote_source.address
            self.sender.open()

    def run(self):
        Container(self).run()


class DetachMixedCloseTest(MessagingHandler):
    ##
    # This test verifies that link-detach (not close) is propagated properly
    ##
    def __init__(self, normal_addr, route_addr):
        super(DetachMixedCloseTest, self).__init__(prefetch=0, auto_accept=False)
        self.normal_addr = normal_addr
        self.route_addr  = route_addr
        self.dest = "pulp.task.DetachMixedClose"
        self.error = None

    def timeout(self):
        self.error = "Timeout Expired - Check for cores"
        self.conn_normal.close()
        self.conn_route.close()

    def stop(self):
        self.conn_normal.close()
        self.conn_route.close()
        self.timer.cancel()

    def on_start(self, event):
        self.timer      = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.conn_route = event.container.connect(self.route_addr)

    def on_connection_opened(self, event):
        if event.connection == self.conn_route:
            self.conn_normal = event.container.connect(self.normal_addr)
        elif event.connection == self.conn_normal:
            self.receiver = event.container.create_receiver(self.conn_normal, self.dest)

    def on_link_opened(self, event):
        if event.receiver == self.receiver:
            self.receiver.detach()

    def on_link_remote_detach(self, event):
        if event.sender == self.sender:
            self.sender.close()
        if event.receiver == self.receiver:
            self.error = 'Client link was detached.  Expected it to be closed'
            self.stop()

    def on_link_closing(self, event):
        if event.sender == self.sender:
            self.error = 'Propagated link was closed.  Expected it to be detached'
            self.stop()

        if event.receiver == self.receiver:
            ##
            # Test Passed
            ##
            self.stop()

    def on_link_opening(self, event):
        if event.sender:
            self.sender = event.sender
            self.sender.source.address = self.sender.remote_source.address
            self.sender.open()

    def run(self):
        Container(self).run()


# Test to validate fix for DISPATCH-927
class EchoDetachReceived(MessagingHandler):
    def __init__(self, sender_address, recv_address):
        super(EchoDetachReceived, self).__init__()
        self.sender_address = sender_address
        self.recv_address = recv_address
        self.dest = "org.apache.dev"
        self.num_msgs = 100
        self.num_receivers = 10
        self.msgs_sent = 0
        self.receiver_conn = None
        self.sender_conn = None
        self.sender = None
        self.receiver_dict = {}
        self.error = None
        self.receiver_attaches = 0
        self.timer = None
        self.sender_attached = False
        self.received_msgs_dict = {}
        self.receiver_detach_dict = {}
        self.num_detaches_echoed = 0

    @property
    def msgs_received(self):
        return sum(self.received_msgs_dict.values())

    def timeout(self):

        self.bail("Timeout Expired: msgs_sent=%d msgs_received=%d, number of detaches received=%d"
                  % (self.msgs_sent, self.msgs_received, self.num_detaches_echoed))

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))

        # Create two separate connections for sender and receivers
        self.receiver_conn = event.container.connect(self.recv_address)
        self.sender_conn = event.container.connect(self.sender_address)
        for i in range(self.num_receivers):
            name = "R%d" % i
            self.receiver_dict[name] = event.container.create_receiver(self.receiver_conn, self.dest, name=name)
            self.received_msgs_dict[name] = 0

    def bail(self, text=None):
        self.error = text
        self.sender_conn.close()
        self.receiver_conn.close()
        self.timer.cancel()

    def on_link_opened(self, event):
        if event.receiver:
            if event.receiver.name in list(self.receiver_dict):
                self.receiver_attaches += 1
            # The response receiver attaches have been received. The receiver sent attaches which was link routed
            # all the way to the 'broker' router and the response attaches have come back.
            # It is now time to create the sender.
            if self.receiver_attaches == self.num_receivers:
                self.sender = event.container.create_sender(self.sender_conn, self.dest)

        elif event.sender:
            if not self.sender_attached:
                if event.sender == self.sender:
                    # The sender attaches were link routed as well and the response attach has been received.
                    self.sender_attached = True

    def on_sendable(self, event):
        # The sender will send 100 messages
        if self.receiver_attaches == self.num_receivers and self.sender_attached:
            if self.msgs_sent < self.num_msgs:
                msg = Message(body="Hello World")
                self.sender.send(msg)
                self.msgs_sent += 1

    def on_message(self, event):
        if event.receiver and event.receiver.name in list(self.receiver_dict):
            self.received_msgs_dict[event.receiver.name] += 1

        if sum(self.received_msgs_dict.values()) == self.num_msgs:
            # The receivers have received a total of 100 messages. Close the receivers. The detach sent by these
            # receivers will travel all the way over the link route and the 'broker' router will respond with a
            # detach
            for receiver in list(self.receiver_dict):
                self.receiver_dict[receiver].close()

    def on_link_closed(self, event):
        if event.receiver.name in list(self.receiver_dict) and event.receiver.name not in list(self.receiver_detach_dict):
            self.receiver_detach_dict[event.receiver.name] = event.receiver
            self.num_detaches_echoed += 1

        # Terminate the test only if both detach frames have been received.
        if all(receiver in list(self.receiver_detach_dict) for receiver in list(self.receiver_dict)):
            self.bail()

    def run(self):
        Container(self).run()


class TerminusAddrTest(MessagingHandler):
    """
    This tests makes sure that the link route address is visible in the output of qdstat -l command.

    Sets up a sender on address pulp.task.terminusTestSender and a receiver on pulp.task.terminusTestReceiver.
    Connects to the router to which the sender is attached and makes sure that the pulp.task.terminusTestSender address
    shows up with an 'in' and 'out'
    Similarly connects to the router to which the receiver is attached and makes sure that the
    pulp.task.terminusTestReceiver address shows up with an 'in' and 'out'

    """

    def __init__(self, sender_address, listening_address, query_address_sending, query_address_listening):
        super(TerminusAddrTest, self).__init__()
        self.sender_address = sender_address
        self.listening_address = listening_address
        self.sender = None
        self.receiver = None
        self.message_received = False
        self.receiver_connection = None
        self.sender_connection = None
        # We will run a query on the same router where the sender is attached
        self.query_address_sending = query_address_sending

        # We will run a query on the same router where the receiver is attached
        self.query_address_listening = query_address_listening
        self.count = 0

        self.in_receiver_found = False
        self.out_receiver_found = False
        self.in_sender_found = False
        self.out_sender_found = False

        self.receiver_link_opened = False
        self.sender_link_opened = False

    def on_start(self, event):
        self.receiver_connection = event.container.connect(self.listening_address)

    def on_connection_remote_open(self, event):
        if event.connection == self.receiver_connection:
            continue_loop = True
            # The following loops introduces a wait. It gives time to the
            # router so that the address Dpulp.task can show up on the remoteCount
            i = 0
            while continue_loop:
                if i > 100:  # If we have run the read command for more than hundred times and we still do not have
                    # the remoteCount set to 1, there is a problem, just exit out of the function instead
                    # of looping to infinity.
                    self.receiver_connection.close()
                    return
                local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT)
                out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount
                if out == 1:
                    continue_loop = False
                i += 1
                sleep(0.25)

            self.sender_connection = event.container.connect(self.sender_address)

            # Notice here that the receiver and sender are listening on different addresses. Receiver on
            # pulp.task.terminusTestReceiver and the sender on pulp.task.terminusTestSender
            self.receiver = event.container.create_receiver(self.receiver_connection, "pulp.task.terminusTestReceiver")
            self.sender = event.container.create_sender(self.sender_connection, "pulp.task.terminusTestSender", options=AtMostOnce())

    def on_link_opened(self, event):
        if event.receiver == self.receiver:
            self.receiver_link_opened = True

            local_node = Node.connect(self.query_address_listening, timeout=TIMEOUT)
            out = local_node.query(type='org.apache.qpid.dispatch.router.link')

            link_dir_index = out.attribute_names.index("linkDir")
            owning_addr_index = out.attribute_names.index("owningAddr")

            # Make sure that the owningAddr M0pulp.task.terminusTestReceiver shows up on both in and out.
            # The 'out' link is on address M0pulp.task.terminusTestReceiver outgoing from the router B to the receiver
            # The 'in' link is on address M0pulp.task.terminusTestReceiver incoming from router C to router B
            for result in out.results:
                if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver':
                    self.in_receiver_found = True
                if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver':
                    self.out_receiver_found = True

        if event.sender == self.sender:
            self.sender_link_opened = True

            local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT)
            out = local_node.query(type='org.apache.qpid.dispatch.router.link')

            link_dir_index = out.attribute_names.index("linkDir")
            owning_addr_index = out.attribute_names.index("owningAddr")

            # Make sure that the owningAddr M0pulp.task.terminusTestSender shows up on both in and out.
            # The 'in' link is on address M0pulp.task.terminusTestSender incoming from sender to router
            # The 'out' link is on address M0pulp.task.terminusTestSender outgoing from router C to router B
            for result in out.results:
                if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender':
                    self.in_sender_found = True
                if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender':
                    self.out_sender_found = True

        # Shutdown the connections only if the on_link_opened has been called for sender and receiver links.
        if self.sender_link_opened and self.receiver_link_opened:
            self.sender.close()
            self.receiver.close()
            self.sender_connection.close()
            self.receiver_connection.close()

    def run(self):
        Container(self).run()


class MultiLinkSendReceive(MessagingHandler):
    class SendState(object):
        def __init__(self, link):
            self.link = link
            self.sent = False
            self.accepted = False
            self.done = False
            self.closed = False

        def send(self, subject, body):
            if not self.sent:
                self.link.send(Message(subject=subject, body=body, address=self.link.target.address))
                self.sent = True

        def on_accepted(self):
            self.accepted = True
            self.done = True

        def close(self):
            if not self.closed:
                self.closed = True
                self.link.close()
                self.link.connection.close()

    class RecvState(object):
        def __init__(self, link):
            self.link = link
            self.received = False
            self.done = False
            self.closed = False

        def on_message(self):
            self.received = True
            self.done = True

        def close(self):
            if not self.closed:
                self.closed = True
                self.link.close()
                self.link.connection.close()

    def __init__(self, send_urls, recv_urls, name, message=None):
        super(MultiLinkSendReceive, self).__init__()
        self.send_urls = send_urls
        self.recv_urls = recv_urls
        self.senders = {}
        self.receivers = {}
        self.message = message or "SendReceiveTest"
        self.sent = False
        self.error = None
        self.name = name

    def close(self):
        for sender in self.senders.values():
            sender.close()
        for receiver in self.receivers.values():
            receiver.close()

    def all_done(self):
        for sender in self.senders.values():
            if not sender.done:
                return False
        for receiver in self.receivers.values():
            if not receiver.done:
                return False
        return True

    def timeout(self):
        self.error = "Timeout Expired"
        self.close()

    def stop_if_all_done(self):
        if self.all_done():
            self.stop()

    def stop(self):
        self.close()
        self.timer.cancel()

    def on_start(self, event):
        self.timer      = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        event.container.container_id = None
        for u in self.send_urls:
            s = self.SendState(event.container.create_sender(u, name=self.name))
            self.senders[s.link.connection.container] = s
        for u in self.recv_urls:
            r = self.RecvState(event.container.create_receiver(u, name=self.name))
            self.receivers[r.link.connection.container] = r

    def on_sendable(self, event):
        self.senders[event.connection.container].send(self.name, self.message)

    def on_message(self, event):
        if self.message != event.message.body:
            error = "Incorrect message. Got %s, expected %s" % (event.message.body, self.message.body)
        self.receivers[event.connection.container].on_message()
        self.stop_if_all_done()

    def on_accepted(self, event):
        self.senders[event.connection.container].on_accepted()
        self.stop_if_all_done()

    def run(self):
        Container(self).run()


class LinkRouteProtocolTest(TestCase):
    """
    Test link route implementation against "misbehaving" containers

    Uses a custom fake broker (not a router) that can do weird things at the
    protocol level.

             +-------------+         +---------+         +-----------------+
             |             | <------ |         | <-----  | blocking_sender |
             | fake broker |         |  QDR.A  |         +-----------------+
             |             | ------> |         | ------> +-------------------+
             +-------------+         +---------+         | blocking_receiver |
                                                         +-------------------+
    """
    @classmethod
    def setUpClass(cls):
        """Configure and start QDR.A"""
        super(LinkRouteProtocolTest, cls).setUpClass()
        config = [
            ('router', {'mode': 'standalone', 'id': 'QDR.A'}),
            # for client connections:
            ('listener', {'role': 'normal',
                          'host': '0.0.0.0',
                          'port': cls.tester.get_port(),
                          'saslMechanisms': 'ANONYMOUS'}),
            # to connect to the fake broker
            ('connector', {'name': 'broker',
                           'role': 'route-container',
                           'host': '127.0.0.1',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),

            # forward 'org.apache' messages to + from fake broker:
            ('linkRoute', {'prefix': 'org.apache', 'containerId': 'FakeBroker', 'direction': 'in'}),
            ('linkRoute', {'prefix': 'org.apache', 'containerId': 'FakeBroker', 'direction': 'out'})
        ]
        config = Qdrouterd.Config(config)
        cls.router = cls.tester.qdrouterd('A', config, wait=False)

    def _fake_broker(self, cls):
        """Spawn a fake broker listening on the broker's connector
        """
        fake_broker = cls(self.router.connector_addresses[0])
        # wait until the connection to the fake broker activates
        self.router.wait_connectors()
        return fake_broker

    def test_DISPATCH_1092(self):
        # This fake broker will force the session closed after the link
        # detaches.  Verify that the session comes back up correctly when the
        # next client attaches
        killer = self._fake_broker(SessionKiller)
        for i in range(2):
            bconn = BlockingConnection(self.router.addresses[0])
            bsender = bconn.create_sender(address="org.apache",
                                          options=AtLeastOnce())
            msg = Message(body="Hey!")
            bsender.send(msg)
            bsender.close()
            bconn.close()
        killer.join()


class SessionKiller(FakeBroker):
    """DISPATCH-1092: force a session close when the link closes.  This should
    cause the router to re-create the session when the next client attaches.
    """

    def __init__(self, url):
        super(SessionKiller, self).__init__(url)

    def on_link_closing(self, event):
        event.link.close()
        event.session.close()


class FakeBrokerDrain(FakeBroker):
    """
    DISPATCH-1496 - Make sure that the router does not grant additional credit
    when drain is issued by a receiver connected to the router on a
    link routed address
    """

    def __init__(self, url):
        super(FakeBrokerDrain, self).__init__(url)
        self.first_flow_received = False
        self.first_drain_mode = False
        self.second_drain_mode = False
        self.error = None
        self.num_flows = 0
        self.success = False

    def on_link_flow(self, event):
        if event.link.is_sender:
            if event.sender.drain_mode:
                if not self.first_drain_mode:
                    self.first_drain_mode = True
                    event.sender.drained()
                elif not self.second_drain_mode:
                    self.second_drain_mode = True
                    if event.link.credit == 1000:
                        # Without the patch for DISPATCH-1496,
                        # the event.link.credit value would be 2000
                        self.success = True
                    else:
                        self.success = False
                    event.sender.drained()
            else:
                if not self.first_flow_received:
                    self.first_flow_received = True
                    msg = Message(body="First Drain Transfer")
                    event.link.send(msg)


class DrainReceiver(MessagingHandler):
    def __init__(self, url, fake_broker):
        super(DrainReceiver, self).__init__(prefetch=0, auto_accept=False)
        self.url = url
        self.received = 0
        self.receiver = None
        self.first_drain_sent = False
        self.second_drain_sent = False
        self.first_flow_sent = False
        self.receiver_conn = None
        self.error = None
        self.num_flows = 0
        self.fake_broker = fake_broker

    def on_start(self, event):
        self.receiver_conn = event.container.connect(self.url)
        self.receiver = event.container.create_receiver(self.receiver_conn, "org.apache")

        # Step 1: Send a flow of 1000 to the router. The router will forward this
        #   flow to the FakeBroker
        self.receiver.flow(1000)
        self.first_flow_sent = True

    def on_link_flow(self, event):
        if event.receiver == self.receiver:
            self.num_flows += 1
            if self.num_flows == 1:
                # Step 4: The response drain received from the FakeBroker
                # Step 5: Send second flow of 1000 credits. This is forwarded to the FakeBroker
                self.receiver.flow(1000)
                self.timer = event.reactor.schedule(3, TestTimeout(self))
            elif self.num_flows == 2:
                if not self.fake_broker.success:
                    self.error = "The FakeBroker did not receive correct credit of 1000"
                self.receiver_conn.close()

    def timeout(self):
        # Step 6: The second drain is sent to the router. The router was forwarding the wrong credit (2000) to the FakeBroker
        # but with the fix for DISPATCH-1496, the correct credit is forwarded (1000)
        self.receiver.drain(0)
        self.second_drain_sent = True

    def on_message(self, event):
        if event.receiver == self.receiver:
            self.received += 1

            # Step 2: In response to Step 1, the broker has sent the only message in its queue
            if self.received == 1:
                self.first_drain_sent = True
                #print ("First message received. Doing first drain")
                # Step 3: The receiver drains after receiving the first message.
                # This drain is forwarded to the FakeBroker
                self.receiver.drain(0)

    def run(self):
        Container(self).run()


class LinkRouteDrainTest(TestCase):
    """
    Test link route drain implementation.
    DISPATCH-1496 alleges that the router is granting extra credit when
    forwarding the drain.

    Uses a router which connects to a FakeBroker (FB)

             +-------------+         +---------+
             |             | <------ |         |
             | fake broker |         |  QDR.A  |
             |             | ------> |         | ------> +-------------------+
             +-------------+         +---------+         | receiver          |
                                                         +-------------------+
    The router will grant extra credit when the following sequence is used
    1. The receiver attaches to the router on a a link routed address called "org.apache"
    2. Receiver issues a flow of 1000. The FakeBroker has only one message in its
       "examples" queue and it sends it over to the router which forwards it to the receiver
    3. After receiving the message the receiver issues a drain(0). This drain is
       forwarded to the FakeBroker by the router and the FB responds. There
       is not problem with this drain
    4. The receiver again gives a flow of 1000 and it is forwarded to the FB. There
       are no messages in the broker queue, so the FB sends no messages
    5. The receiver again issues a drain(0). At this time, without the fix for
       DISPATCH-1496, the router issues double the credit to the FB. Instead
       of issuing a credit of 1000, it issues a credit of 2000.
    """
    @classmethod
    def setUpClass(cls):
        """Configure and start QDR.A"""
        super(LinkRouteDrainTest, cls).setUpClass()
        config = [
            ('router', {'mode': 'standalone', 'id': 'QDR.A'}),
            # for client connections:
            ('listener', {'role': 'normal',
                          'host': '0.0.0.0',
                          'port': cls.tester.get_port(),
                          'saslMechanisms': 'ANONYMOUS'}),
            # to connect to the fake broker
            ('connector', {'name': 'broker',
                           'role': 'route-container',
                           'host': '127.0.0.1',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),

            # forward 'org.apache' messages to + from fake broker:
            ('linkRoute', {'prefix': 'org.apache', 'containerId': 'FakeBroker', 'direction': 'in'}),
            ('linkRoute', {'prefix': 'org.apache', 'containerId': 'FakeBroker', 'direction': 'out'})
        ]
        config = Qdrouterd.Config(config)
        cls.router = cls.tester.qdrouterd('A', config, wait=False)

    def _fake_broker(self, cls):
        """Spawn a fake broker listening on the broker's connector
        """
        fake_broker = cls(self.router.connector_addresses[0])
        # wait until the connection to the fake broker activates
        self.router.wait_connectors()
        return fake_broker

    def test_DISPATCH_1496(self):
        fake_broker = self._fake_broker(FakeBrokerDrain)
        drain_receiver = DrainReceiver(self.router.addresses[0], fake_broker)
        drain_receiver.run()
        self.assertEqual(drain_receiver.error, None)


class EmptyTransferTest(TestCase):
    @classmethod
    def setUpClass(cls):
        super(EmptyTransferTest, cls).setUpClass()
        cls.ROUTER_LISTEN_PORT = cls.tester.get_port()

        config = [
            ('router', {'mode': 'standalone', 'id': 'QDR.A'}),
            # the client will connect to this listener
            ('listener', {'role': 'normal',
                          'host': '0.0.0.0',
                          'port': cls.ROUTER_LISTEN_PORT,
                          'saslMechanisms': 'ANONYMOUS'}),
            # to connect to the fake broker
            ('connector', {'name': 'broker',
                           'role': 'route-container',
                           'host': '127.0.0.1',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),
            ('linkRoute',
             {'prefix': 'examples', 'containerId': 'FakeBroker',
              'direction': 'in'}),
            ('linkRoute',
             {'prefix': 'examples', 'containerId': 'FakeBroker',
              'direction': 'out'})
        ]
        config = Qdrouterd.Config(config)
        cls.router = cls.tester.qdrouterd('A', config, wait=False)

    def _fake_broker(self, cls):
        """
        Spawn a fake broker listening on the broker's connector
        """
        fake_broker = cls(self.router.connector_addresses[0])
        # wait until the connection to the fake broker activates
        self.router.wait_connectors()
        return fake_broker

    def test_DISPATCH_1988(self):
        fake_broker = self._fake_broker(FakeBroker)
        AMQP_OPEN_BEGIN_ATTACH = bytearray(
            b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00'
            b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06'
            b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21'
            b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00'
            b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00'
            b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b'
            b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72'
            b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b'
            b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0'
            b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70'
            b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00')

        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Connect to the router listening port and send an amqp, open,
        # begin, attach. The attach is sent on the link
        # routed address, "examples"
        s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT))
        s.sendall(AMQP_OPEN_BEGIN_ATTACH)

        # Give a second for the attach to propagate to the broker and
        # for the broker to send a response attach
        sleep(1)
        data = s.recv(2048)
        self.assertIn("examples", repr(data))

        # First send a message on link routed address "examples" with
        # message body of "message 0"
        # Verify the the sent message has been accepted.
        TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
                               + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01'
                               + b'\xa0\x01\x01\x43\x42'
                               + b'\x40\x40\x40\x40\x40\x42\x00\x53'
                               + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
                               + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
                               + b'\x65\x20\x30')
        s.sendall(TRANSFER_1)
        sleep(0.5)
        data = s.recv(1024)
        # The delivery has been accepted.
        self.assertIn("x00S$E", repr(data))

        # Test case 1
        # Send an empty transfer frame to the router and you should
        # receive a rejected disposition from the router.
        # Without the fix for DISPATCH_1988,
        # upon sending this EMPTY_TRANSFER
        # the router crashes with the following assert
        # qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion `content->pending && qd_buffer_size(content->pending) > 0' failed.
        # This is the empty transfer frame that is sent to the router.
        # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false]
        EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00'
                                   + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52'
                                   + b'\x02\xa0\x01\x02\x43\x42'
                                   + b'\x42\x40\x40\x40\x40\x42')
        s.sendall(EMPTY_TRANSFER)
        sleep(1)
        data = s.recv(1024)
        # The delivery has been rejected.
        self.assertIn("x00S%E", repr(data))

        # Let's send another transfer to make sure that the
        # router has not crashed.
        TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
                               + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03'
                               + b'\xa0\x01\x03\x43\x42'
                               + b'\x40\x40\x40\x40\x40\x42\x00\x53'
                               + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
                               + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
                               + b'\x65\x20\x30')
        s.sendall(TRANSFER_1)
        sleep(0.5)
        data = s.recv(1024)
        # The delivery has been accepted.
        self.assertIn("x00S$E", repr(data))

        # Test case 2
        # Now, send two empty transfer frames, first transfer has
        # more=true and the next transfer has more=false.
        # This will again be rejected by the router.
        # The following are the two transfer frames that will be
        # sent to the router.
        #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = true, batchable = false]
        #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = false, batchable = false]
        EMPTY_TRANSFER_MORE_TRUE = bytearray(
            b'\x00\x00\x00\x1c\x02\x00\x00\x00'
            + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04'
            + b'\xa0\x01\x04\x43\x42'
            + b'\x41\x40\x40\x40\x40\x42')
        EMPTY_TRANSFER_MORE_FALSE = bytearray(
            b'\x00\x00\x00\x1c\x02\x00\x00\x00'
            + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04'
            + b'\xa0\x01\x04\x43\x42'
            + b'\x42\x40\x40\x40\x40\x42')
        s.sendall(EMPTY_TRANSFER_MORE_TRUE)
        s.sendall(EMPTY_TRANSFER_MORE_FALSE)
        sleep(0.5)
        data = s.recv(1024)
        # The delivery has been rejected.
        self.assertIn("x00S%E", repr(data))

        s.close()


class ConnectionLinkRouteTest(TestCase):
    """
    Test connection scoped link route implementation

    Base configuration:

                                                        +-----------------+
                           +---------+    +---------+<--| blocking_sender |
    +-----------------+    |         |    |         |   +-----------------+
    | Fake LR Service |<==>|  QDR.A  |<==>|  QDR.B  |
    +-----------------+    |         |    |         |   +-------------------+
                           +---------+    +---------+-->| blocking_receiver |
                                                        +-------------------+

    The Fake Link Route Service will create connection-scoped link routes to
    QDR.A, while blocking sender/receivers on QDR.B will send/receive messages
    via the link route.
    """

    _AS_TYPE = "org.apache.qpid.dispatch.router.connection.linkRoute"

    @classmethod
    def setUpClass(cls):
        super(ConnectionLinkRouteTest, cls).setUpClass()

        b_port = cls.tester.get_port()
        configs = [
            # QDR.A:
            [('router', {'mode': 'interior', 'id': 'QDR.A'}),
             # for fake connection-scoped LRs:
             ('listener', {'role': 'normal',
                           'host': '0.0.0.0',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),
             # for fake route-container LR connections:
             ('listener', {'role': 'route-container',
                           'host': '0.0.0.0',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),
             # to connect to the QDR.B
             ('connector', {'role': 'inter-router',
                            'host': '127.0.0.1',
                            'port': b_port,
                            'saslMechanisms': 'ANONYMOUS'})],
            # QDR.B:
            [('router', {'mode': 'interior', 'id': 'QDR.B'}),
             # for client connections
             ('listener', {'role': 'normal',
                           'host': '0.0.0.0',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),
             # for connection to QDR.A
             ('listener', {'role': 'inter-router',
                           'host': '0.0.0.0',
                           'port': b_port,
                           'saslMechanisms': 'ANONYMOUS'})]
        ]

        cls.routers = []
        for c in configs:
            config = Qdrouterd.Config(c)
            cls.routers.append(cls.tester.qdrouterd(config=config, wait=False))
        cls.QDR_A = cls.routers[0]
        cls.QDR_B = cls.routers[1]
        cls.QDR_A.wait_router_connected('QDR.B')
        cls.QDR_B.wait_router_connected('QDR.A')

    def _get_address(self, mgmt, addr):
        a_type = 'org.apache.qpid.dispatch.router.address'
        return list(filter(lambda a: a['name'].endswith(addr),
                           mgmt.query(a_type)))

    def test_config_file_bad(self):
        # verify that specifying a connection link route in the configuration
        # file fails
        config = [('router', {'mode': 'interior', 'id': 'QDR.X'}),
                  ('listener', {'role': 'normal',
                                'host': '0.0.0.0',
                                'port': self.tester.get_port(),
                                'saslMechanisms': 'ANONYMOUS'}),

                  ('connection.linkRoute',
                   {'pattern': "i/am/bad",
                    'direction': "out"})
                  ]

        cfg = Qdrouterd.Config(config)
        # we expect the router to fail
        router = self.tester.qdrouterd("X", cfg, wait=False, expect=Process.EXIT_FAIL)  # type: Qdrouterd
        self.assertEqual(router.wait(TIMEOUT), Process.EXIT_FAIL)

    def test_mgmt(self):
        # test create, delete, and query
        mgmt_conn = BlockingConnection(self.QDR_A.addresses[0])
        mgmt_proxy = ConnLinkRouteMgmtProxy(mgmt_conn)

        for i in range(10):
            rsp = mgmt_proxy.create_conn_link_route("lr1-%d" % i,
                                                    {'pattern': "*/hi/there/%d" % i,
                                                     'direction':
                                                     'out' if i % 2 else 'in'})
            self.assertEqual(201, rsp.status_code)

        # test query
        rsp = mgmt_proxy.query_conn_link_routes()
        self.assertEqual(200, rsp.status_code)
        self.assertEqual(10, len(rsp.results))
        entities = rsp.results

        # test read
        rsp = mgmt_proxy.read_conn_link_route('lr1-5')
        self.assertEqual(200, rsp.status_code)
        self.assertEqual("lr1-5", rsp.attrs['name'])
        self.assertEqual("*/hi/there/5", rsp.attrs['pattern'])
        self.assertEqual(mgmt_conn.container.container_id,
                         rsp.attrs['containerId'])

        # bad creates
        attrs = [{'pattern': "bad", 'direction': "bad"},
                 {'direction': 'in'},
                 {},
                 {'pattern': ''},
                 {'pattern': 7}]
        for a in attrs:
            rsp = mgmt_proxy.create_conn_link_route("iamnoone", a)
            self.assertEqual(400, rsp.status_code)

        # bad read
        rsp = mgmt_proxy.read_conn_link_route('iamnoone')
        self.assertEqual(404, rsp.status_code)

        # bad delete
        rsp = mgmt_proxy.delete_conn_link_route('iamnoone')
        self.assertEqual(404, rsp.status_code)

        # delete all
        for r in entities:
            self.assertEqual(200, r.status_code)
            rsp = mgmt_proxy.delete_conn_link_route(r.attrs['name'])
            self.assertEqual(204, rsp.status_code)

        # query - should be none left
        rsp = mgmt_proxy.query_conn_link_routes()
        self.assertEqual(200, rsp.status_code)
        self.assertEqual(0, len(rsp.results))

    def test_address_propagation(self):
        # test service that creates and deletes connection link routes
        fs = ConnLinkRouteService(self.QDR_A.addresses[1], container_id="FakeService",
                                  config=[("clr1",
                                           {"pattern": "flea.*",
                                            "direction": "out"}),
                                          ("clr2",
                                           {"pattern": "flea.*",
                                            "direction": "in"})])
        self.assertEqual(2, len(fs.values))

        # the address should propagate to A and B
        self.QDR_A.wait_address(address="flea.*", count=2)
        self.QDR_B.wait_address(address="flea.*", count=2)

        # now have the service delete the config
        fs.delete_config()

        # eventually the addresses will be un-published
        mgmt_A = QdManager(self, address=self.QDR_A.addresses[0])
        mgmt_B = QdManager(self, address=self.QDR_B.addresses[0])
        deadline = time() + TIMEOUT
        while (self._get_address(mgmt_A, "flea.*")
               or self._get_address(mgmt_B, "flea.*")):
            self.assertTrue(time() < deadline)
            sleep(0.1)

        fs.join()

    # simple forwarding tests with auto delete
    def test_send_receive(self):
        COUNT = 5
        mgmt_A = QdManager(self, address=self.QDR_A.addresses[0])
        mgmt_B = QdManager(self, address=self.QDR_B.addresses[0])

        # connect broker to A route-container
        fs = ConnLinkRouteService(self.QDR_A.addresses[1], container_id="FakeService",
                                  config=[("clr1",
                                           {"pattern": "flea.*",
                                            "direction": "out"}),
                                          ("clr2",
                                           {"pattern": "flea.*",
                                            "direction": "in"})])
        self.assertEqual(2, len(fs.values))

        # wait for the address to propagate to B
        self.QDR_B.wait_address(address="flea.*", count=2)

        # ensure the link routes are not visible via other connections
        clrs = mgmt_A.query(self._AS_TYPE)
        self.assertEqual(0, len(clrs))

        # send from A to B
        r = AsyncTestReceiver(self.QDR_B.addresses[0],
                              "flea.B",
                              container_id="flea.BReceiver")
        s = AsyncTestSender(self.QDR_A.addresses[0],
                            "flea.B",
                            container_id="flea.BSender",
                            message=Message(body="SENDING TO flea.B"),
                            count=COUNT)
        s.wait()   # for sender to complete
        for i in range(COUNT):
            self.assertEqual("SENDING TO flea.B",
                             r.queue.get(timeout=TIMEOUT).body)
        r.stop()
        self.assertEqual(COUNT, fs.in_count)

        # send from B to A
        r = AsyncTestReceiver(self.QDR_A.addresses[0],
                              "flea.A",
                              container_id="flea.AReceiver")
        s = AsyncTestSender(self.QDR_B.addresses[0],
                            "flea.A",
                            container_id="flea.ASender",
                            message=Message(body="SENDING TO flea.A"),
                            count=COUNT)
        s.wait()
        for i in range(COUNT):
            self.assertEqual("SENDING TO flea.A",
                             r.queue.get(timeout=TIMEOUT).body)
        r.stop()
        self.assertEqual(2 * COUNT, fs.in_count)

        # once the fake service closes its conn the link routes
        # are removed so the link route addresses must be gone
        fs.join()

        mgmt_A = QdManager(self, address=self.QDR_A.addresses[0])
        mgmt_B = QdManager(self, address=self.QDR_B.addresses[0])
        deadline = time() + TIMEOUT
        while (self._get_address(mgmt_A, "flea.*")
               or self._get_address(mgmt_B, "flea.*")):
            self.assertTrue(time() < deadline)
            sleep(0.1)


class ConnLinkRouteService(FakeBroker):
    def __init__(self, url, container_id, config, timeout=TIMEOUT):
        self.conn = None
        self.mgmt_proxy = None
        self.mgmt_sender = None
        self.mgmt_receiver = None
        self._config = config
        self._config_index = 0
        self._config_done = Event()
        self._config_error = None
        self._config_values = []
        self._cleaning_up = False
        self._delete_done = Event()
        self._delete_count = 0
        self._event_injector = EventInjector()
        self._delete_event = ApplicationEvent("delete_config")
        super(ConnLinkRouteService, self).__init__(url, container_id)
        if self._config_done.wait(timeout) is False:
            raise Exception("Timed out waiting for configuration setup")
        if self._config_error is not None:
            raise Exception("Error: %s" % self._config_error)

    @property
    def values(self):
        return self._config_values

    def delete_config(self):
        self._event_injector.trigger(self._delete_event)
        if self._delete_done.wait(TIMEOUT) is False:
            raise Exception("Timed out waiting for configuration delete")

    def on_start(self, event):
        """
        Do not create an acceptor, actively connect instead
        """
        event.container.selectable(self._event_injector)
        self.conn = event.container.connect(self.url)

    def on_connection_opened(self, event):
        if event.connection == self.conn:
            if self.mgmt_receiver is None:
                self.mgmt_receiver = event.container.create_receiver(self.conn,
                                                                     dynamic=True)
        super(ConnLinkRouteService, self).on_connection_opened(event)

    def on_connection_closed(self, event):
        if self._event_injector:
            self._event_injector.close()
            self._event_injector = None
        super(ConnLinkRouteService, self).on_connection_closed(event)

    def on_link_opened(self, event):
        if event.link == self.mgmt_receiver:
            self.mgmt_proxy = MgmtMsgProxy(self.mgmt_receiver.remote_source.address)
            self.mgmt_sender = event.container.create_sender(self.conn,
                                                             target="$management")

    def on_link_error(self, event):
        # when a remote client disconnects the service will get a link error
        # that is expected - simply clean up the link
        self.on_link_closing(event)

    def on_sendable(self, event):
        if event.sender == self.mgmt_sender:
            if not self._cleaning_up:
                if self._config_index < len(self._config):
                    cfg = self._config[self._config_index]
                    msg = self.mgmt_proxy.create_conn_link_route(cfg[0], cfg[1])
                    self.mgmt_sender.send(msg)
                    self._config_index += 1
            elif self._config_values:
                cv = self._config_values.pop()
                msg = self.mgmt_proxy.delete_conn_link_route(cv['name'])
                self._delete_count += 1
        else:
            super(ConnLinkRouteService, self).on_sendable(event)

    def on_message(self, event):
        if event.receiver == self.mgmt_receiver:
            response = self.mgmt_proxy.response(event.message)
            if response.status_code == 201:
                # created:
                self._config_values.append(response.attrs)
                if len(self._config_values) == len(self._config):
                    self._config_done.set()
            elif response.status_code == 204:
                # deleted
                self._delete_count -= 1
                if (not self._config_values) and self._delete_count == 0:
                    self._delete_done.set()
            else:
                # error
                self._config_error = ("mgmt failed: %s" %
                                      response.status_description)
                self._config_done.set()
                self._delete_done.set()
        else:
            super(ConnLinkRouteService, self).on_message(event)

    def on_delete_config(self, event):
        if not self._cleaning_up:
            self._cleaning_up = True
            if not self._config_values:
                self._delete_done.set()
            else:
                try:
                    while self.mgmt_sender.credit > 0:
                        cv = self._config_values.pop()
                        msg = self.mgmt_proxy.delete_conn_link_route(cv["name"])
                        self.mgmt_sender.send(msg)
                        self._delete_count += 1
                except IndexError:
                    pass


class ConnLinkRouteMgmtProxy(object):
    """
    Manage connection scoped link routes over a given connection.
    While the connection remains open the connection scoped links will remain
    configured and active
    """

    def __init__(self, bconn, credit=250):
        self._receiver = bconn.create_receiver(address=None, dynamic=True, credit=credit)
        self._sender = bconn.create_sender(address="$management")
        self._proxy = MgmtMsgProxy(self._receiver.link.remote_source.address)

    def __getattr__(self, key):
        # wrap accesses to the management message functions so we can send and
        # receive the messages using the blocking links
        f = getattr(self._proxy, key)
        if not callable(f):
            return f

        def _func(*args, **kwargs):
            self._sender.send(f(*args, **kwargs))
            return self._proxy.response(self._receiver.receive())
        return _func


class InvalidTagTest(MessagingHandler):
    """Verify that a message with an invalid tag length is rejected
    """

    def __init__(self, router_addr):
        super(InvalidTagTest, self).__init__(auto_accept=False, auto_settle=False)
        self.test_conn = None
        self.test_address = router_addr
        self.tx_ct = 0
        self.accept_ct = 0
        self.reject_ct = 0
        self.error = None

    def timeout(self):
        self.error = "Timeout expired: sent=%d rcvd=%d" % (self.tx_ct,
                                                           self.accept_ct
                                                           + self.reject_ct)
        if self.test_conn:
            self.test_conn.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.test_conn = event.container.connect(self.test_address)
        rx = event.container.create_receiver(self.test_conn, "org.apache.foo")

    def on_link_opened(self, event):
        if event.receiver:
            event.receiver.flow(100)
            event.container.create_sender(event.connection, "org.apache.foo")

    def on_sendable(self, event):
        if self.tx_ct < 10:
            self.tx_ct += 1
            if self.tx_ct == 5:
                event.sender.send(Message(body="YO"), tag=str("X" * 64))
            else:
                event.sender.send(Message(body="YO"), tag=str("BLAH%d" %
                                                              self.tx_ct))

    def on_accepted(self, event):
        self.accept_ct += 1
        event.delivery.settle()
        if self.accept_ct == 9 and self.reject_ct == 1:
            event.connection.close()
            self.timer.cancel()

    def on_rejected(self, event):
        self.reject_ct += 1
        event.delivery.settle()

    def on_message(self, event):
        event.delivery.update(Delivery.ACCEPTED)
        event.delivery.settle()

    def run(self):
        Container(self).run()


class Dispatch1428(TestCase):
    """
    Sets up 2 routers (one of which are acting as brokers (QDR.A)).

        QDR.A acting broker #1
             +---------+         +---------+
             |         | <------ |         |
             |  QDR.A  |         |  QDR.B  |
             |         | ------> |         |
             +---------+         +---------+

    """
    @classmethod
    def get_router(cls, index):
        return cls.routers[index]

    @classmethod
    def setUpClass(cls):
        """Start two routers"""
        super(Dispatch1428, cls).setUpClass()

        def router(name, connection):

            config = [
                ('router', {'mode': 'interior', 'id': 'QDR.%s' % name}),
            ] + connection

            config = Qdrouterd.Config(config)
            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))

        cls.routers = []
        a_listener_port = cls.tester.get_port()
        b_listener_port = cls.tester.get_port()

        router('A',
               [
                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
               ])
        router('B',
               [
                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
                   ('connector', {'name': 'one', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
                   ('connector', {'name': 'two', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'})
               ]
               )
        sleep(2)

    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
        p = self.popen(
            ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
            universal_newlines=True)
        out = p.communicate(input)[0]
        try:
            p.teardown()
        except Exception as e:
            raise Exception("%s\n%s" % (e, out))
        return out

    def test_both_link_routes_active(self):
        cmds = [
            'CREATE --type=linkRoute name=foo prefix=foo direction=in connection=one',
            'CREATE --type=linkRoute name=bar prefix=bar direction=in connection=two',
            'CREATE --type=linkRoute name=baz prefix=baz direction=in containerId=QDR.A'
        ]
        for c in cmds:
            self.run_qdmanage(cmd=c, address=self.routers[1].addresses[0])

        # Now that the qdmanage has run, query the link routes and make sure that their "operStatus" is "active" before
        # running any of the tests.
        long_type = 'org.apache.qpid.dispatch.router.config.linkRoute'
        qd_manager = QdManager(self, address=self.routers[1].addresses[0])

        for i in range(5):
            all_link_routes_activated = True
            link_routes = qd_manager.query(long_type)
            for link_route in link_routes:
                oper_status = link_route['operStatus']
                if oper_status != "active":
                    all_link_routes_activated = False
                    break
            if not all_link_routes_activated:
                # One or more of the link routes have not been activated.
                # Check after one second.
                sleep(1)
            else:
                break

        # All link routes created in this test MUST be activated before
        # we can continue further testing.
        self.assertTrue(all_link_routes_activated)

        first = SendReceive("%s/foo" % self.routers[1].addresses[0], "%s/foo" % self.routers[0].addresses[0])
        first.run()
        self.assertEqual(None, first.error)
        second = SendReceive("%s/bar" % self.routers[1].addresses[0], "%s/bar" % self.routers[0].addresses[0])
        second.run()
        self.assertEqual(None, second.error)
        third = SendReceive("%s/baz" % self.routers[1].addresses[0], "%s/baz" % self.routers[0].addresses[0])
        third.run()
        self.assertEqual(None, third.error)


class SendReceive(MessagingHandler):
    def __init__(self, send_url, recv_url, message=None):
        super(SendReceive, self).__init__()
        self.send_url = send_url
        self.recv_url = recv_url
        self.message = message or Message(body="SendReceiveTest")
        self.sent = False
        self.error = None

    def close(self):
        self.sender.close()
        self.receiver.close()
        self.sender.connection.close()
        self.receiver.connection.close()

    def timeout(self):
        self.error = "Timeout Expired - Check for cores"
        self.close()

    def stop(self):
        self.close()
        self.timer.cancel()

    def on_start(self, event):
        self.timer      = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        event.container.container_id = "SendReceiveTestClient"
        self.sender = event.container.create_sender(self.send_url)
        self.receiver = event.container.create_receiver(self.recv_url)

    def on_sendable(self, event):
        if not self.sent:
            event.sender.send(self.message)
            self.sent = True

    def on_message(self, event):
        if self.message.body != event.message.body:
            self.error = "Incorrect message. Got %s, expected %s" % (event.message.body, self.message.body)

    def on_accepted(self, event):
        self.stop()

    def run(self):
        Container(self).run()


class DispositionSniffer(MessagingHandler):
    """
    Capture the outgoing delivery after the remote has set its terminal
    outcome.  Used by tests that need to examine the delivery state
    """

    def __init__(self, send_url):
        super(DispositionSniffer, self).__init__(auto_accept=False,
                                                 auto_settle=False)
        self.send_url = send_url
        self.sender = None
        self.timer = None
        self.error = None
        self.sent = False
        self.delivery = None

    def close(self):
        if self.timer:
            self.timer.cancel()
        if self.sender:
            self.sender.close()
            self.sender.connection.close()

    def timeout(self):
        self.error = "Timeout Expired - Check for cores"
        self.close()

    def stop(self):
        self.close()

    def on_start(self, event):
        self.timer  = event.reactor.schedule(TIMEOUT, TestTimeout(self))
        self.sender = event.container.create_sender(self.send_url)

    def on_sendable(self, event):
        if not self.sent:
            event.sender.send(Message(body="HI"))
            self.sent = True

    def on_accepted(self, event):
        self.stop()

    def on_released(self, event):
        self.delivery = event.delivery
        self.close()

    def on_modified(self, event):
        self.delivery = event.delivery
        self.close()

    def on_rejected(self, event):
        self.delivery = event.delivery
        self.close()

    def run(self):
        Container(self).run()


class LinkRoute3Hop(TestCase):
    """
    Sets up a linear 3 hop router network for testing multi-hop link routes.

             +---------+         +---------+         +---------+     +------------------+
             |         | <------ |         | <-----  |         |<----| blocking_senders |
             |  QDR.A  |         |  QDR.B  |         |  QDR.C  |     +------------------+
             |         | ------> |         | ------> |         |     +--------------------+
             +---------+         +---------+         +---------+---->| blocking_receivers |
                  ^                                                  +--------------------+
                  |
                  V
           +-------------+
           | FakeService |
           +-------------+

    """

    @classmethod
    def setUpClass(cls):
        super(LinkRoute3Hop, cls).setUpClass()

        b_port = cls.tester.get_port()
        configs = [
            # QDR.A:
            [('router', {'mode': 'interior', 'id': 'QDR.A'}),
             # for client access
             ('listener', {'role': 'normal',
                           'host': '0.0.0.0',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),
             # for fake service:
             ('listener', {'role': 'route-container',
                           'host': '0.0.0.0',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),
             # to connect to the QDR.B
             ('connector', {'role': 'inter-router',
                            'host': '127.0.0.1',
                            'port': b_port,
                            'saslMechanisms': 'ANONYMOUS'}),
             # the routes
             ('linkRoute', {'prefix': 'closest/test-client', 'containerId': 'FakeService', 'direction': 'in'}),
             ('linkRoute', {'prefix': 'closest/test-client', 'containerId': 'FakeService', 'direction': 'out'})
             ],
            # QDR.B:
            [('router', {'mode': 'interior', 'id': 'QDR.B'}),
             # for client connections
             ('listener', {'role': 'normal',
                           'host': '0.0.0.0',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),
             # for inter-router connections from QDR.A and QDR.C
             ('listener', {'role': 'inter-router',
                           'host': '0.0.0.0',
                           'port': b_port,
                           'saslMechanisms': 'ANONYMOUS'}),
             ('linkRoute', {'prefix': 'closest/test-client', 'direction': 'in'}),
             ('linkRoute', {'prefix': 'closest/test-client', 'direction': 'out'})
             ],
            # QDR.C
            [('router', {'mode': 'interior', 'id': 'QDR.C'}),
             # for client connections
             ('listener', {'role': 'normal',
                           'host': '0.0.0.0',
                           'port': cls.tester.get_port(),
                           'saslMechanisms': 'ANONYMOUS'}),
             # to connect to the QDR.B
             ('connector', {'role': 'inter-router',
                            'host': '127.0.0.1',
                            'port': b_port,
                            'saslMechanisms': 'ANONYMOUS'}),
             ('linkRoute', {'prefix': 'closest/test-client', 'direction': 'in'}),
             ('linkRoute', {'prefix': 'closest/test-client', 'direction': 'out'})
             ]
        ]

        cls.routers = []
        for c in configs:
            config = Qdrouterd.Config(c)
            cls.routers.append(cls.tester.qdrouterd(config=config, wait=False))
        cls.QDR_A = cls.routers[0]
        cls.QDR_B = cls.routers[1]
        cls.QDR_C = cls.routers[2]

        cls.QDR_A.wait_router_connected('QDR.B')
        cls.QDR_B.wait_router_connected('QDR.A')
        cls.QDR_B.wait_router_connected('QDR.C')
        cls.QDR_C.wait_router_connected('QDR.B')
        cls.QDR_C.wait_router_connected('QDR.A')
        cls.QDR_A.wait_router_connected('QDR.C')

    def test_01_parallel_link_routes(self):
        """
        Verify Q2/Q3 recovery in the case of multiple link-routes sharing the
        same session.
        """
        send_clients = 10
        send_batch = 10
        total = send_clients * send_batch

        fake_service = FakeService(self.QDR_A.addresses[1],
                                   container_id="FakeService")
        self.QDR_C.wait_address("closest/test-client",
                                remotes=1)

        env = None
        rx = self.popen(["test-receiver",
                         "-a", self.QDR_C.addresses[0],
                         "-c", str(total),
                         "-s", "closest/test-client"],
                        env=env,
                        expect=Process.EXIT_OK)

        def _spawn_sender(x):
            return self.popen(["test-sender",
                               "-a", self.QDR_C.addresses[0],
                               "-c", str(send_batch),
                               "-i", "TestSender-%s" % x,
                               "-sx",   # huge message size to trigger Q2/Q3
                               "-t", "closest/test-client"],
                              env=env,
                              expect=Process.EXIT_OK)

        senders = [_spawn_sender(s) for s in range(send_clients)]

        for tx in senders:
            out_text, out_err = tx.communicate(timeout=TIMEOUT)
            if tx.returncode:
                raise Exception("Sender failed: %s %s" % (out_text, out_err))

        if rx.wait(timeout=TIMEOUT):
            raise Exception("Receiver failed to consume all messages in=%s out=%s",
                            fake_service.in_count,
                            fake_service.out_count)

        fake_service.join()
        self.assertEqual(total, fake_service.in_count)
        self.assertEqual(total, fake_service.out_count)

        self.QDR_C.wait_address_unsubscribed("closest/test-client")

    def test_02_modified_outcome(self):
        """
        Ensure all elements of a Modified disposition are passed thru the link
        route
        """

        class FakeServiceModified(FakeService):
            def on_message(self, event):
                # set non-default values for delivery state for delivery to
                # remote endpoint
                dlv = event.delivery
                dlv.local.failed = True
                dlv.local.undeliverable = True
                dlv.local.annotations = {symbol("Key"): "Value"}
                dlv.update(Delivery.MODIFIED)
                dlv.settle()

        fake_service = FakeServiceModified(self.QDR_A.addresses[1],
                                           container_id="FakeService",
                                           auto_accept=False,
                                           auto_settle=False)
        self.QDR_C.wait_address("closest/test-client",
                                remotes=1)

        sniffer = DispositionSniffer("%s/closest/test-client" %
                                     self.QDR_C.addresses[0])
        sniffer.run()
        self.assertEqual(None, sniffer.error)
        state = sniffer.delivery.remote
        self.assertTrue(state.failed)
        self.assertTrue(state.undeliverable)
        self.assertTrue(state.annotations is not None)
        self.assertTrue(symbol('Key') in state.annotations)
        self.assertEqual('Value', state.annotations[symbol('Key')])

        fake_service.join()
        self.QDR_C.wait_address_unsubscribed("closest/test-client")

    def test_03_rejected_outcome(self):
        """
        Ensure all elements of a Rejected disposition are passed thru the link
        route
        """

        class FakeServiceReject(FakeService):
            def on_message(self, event):
                # set non-default values for delivery state for delivery to
                # remote endpoint
                dlv = event.delivery
                dlv.local.condition = Condition("condition-name",
                                                str("condition-description"),
                                                {symbol("condition"): "info"})
                dlv.update(Delivery.REJECTED)
                dlv.settle()

        fake_service = FakeServiceReject(self.QDR_A.addresses[1],
                                         container_id="FakeService",
                                         auto_accept=False,
                                         auto_settle=False)
        self.QDR_C.wait_address("closest/test-client",
                                remotes=1)

        sniffer = DispositionSniffer("%s/closest/test-client" %
                                     self.QDR_C.addresses[0])
        sniffer.run()
        self.assertEqual(None, sniffer.error)
        state = sniffer.delivery.remote
        self.assertTrue(state.condition is not None)
        self.assertEqual("condition-name", state.condition.name)
        self.assertEqual("condition-description", state.condition.description)
        self.assertTrue(state.condition.info is not None)
        self.assertTrue(symbol("condition") in state.condition.info)
        self.assertEqual('info', state.condition.info[symbol("condition")])

        fake_service.join()
        self.QDR_C.wait_address_unsubscribed("closest/test-client")

    def test_04_extension_state(self):
        """
        system_tests_two_routers.TwoRouterExtensionsStateTest() already tests
        sending extended state via a link route.
        """
        pass


if __name__ == '__main__':
    unittest.main(main_module())
