#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

from proton import Condition, Message, Delivery, Url, symbol, Timeout
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, DIR, Process
from system_test import unittest, QdManager
from proton.handlers import MessagingHandler, TransactionHandler
from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
from proton.utils import BlockingConnection, SyncRequestResponse
from proton import VERSION as PROTON_VERSION
from proton import Terminus
from proton import Data
from qpid_dispatch.management.client import Node
import os, json
from subprocess import PIPE, STDOUT
from time import sleep
from test_broker import FakeBroker


CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451}
CONNECTION_PROPERTIES_SYMBOL = dict()
CONNECTION_PROPERTIES_SYMBOL[symbol("connection")] = symbol("properties")
CONNECTION_PROPERTIES_BINARY = {b'client_identifier': b'policy_server'}


#====================================================
# Helper classes for all tests.
#====================================================


# Named timers allow test code to distinguish between several
# simultaneous timers, going off at different rates.
class MultiTimeout ( object ):
    def __init__(self, parent, name):
        self.parent = parent
        self.name   = name

    def on_timer_task(self, event):
        self.parent.timeout ( self.name )


class OneRouterTest(TestCase):
    """System tests involving a single router"""
    @classmethod
    def setUpClass(cls):
        """Start a router and a messenger"""
        super(OneRouterTest, cls).setUpClass()
        name = "test-router"
        policy_config_path = os.path.join(DIR, 'one-router-policy')
        OneRouterTest.listen_port = cls.tester.get_port()
        config = Qdrouterd.Config([
            ('router', {'mode': 'standalone', 'id': 'QDR', 'allowUnsettledMulticast': 'yes'}),
            ('policy', {'policyDir': policy_config_path,
                        'enableVhostPolicy': 'true'}),

            # Setting the stripAnnotations to 'no' so that the existing tests will work.
            # Setting stripAnnotations to no will not strip the annotations and any tests that were already in this file
            # that were expecting the annotations to not be stripped will continue working.
            ('listener', {'port': OneRouterTest.listen_port, 'maxFrameSize': '2048', 'stripAnnotations': 'no'}),

            # The following listeners were exclusively added to test the stripAnnotations attribute in qdrouterd.conf file
            # Different listeners will be used to test all allowed values of stripAnnotations ('no', 'both', 'out', 'in')
            ('listener', {'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'stripAnnotations': 'no'}),
            ('listener', {'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'stripAnnotations': 'both'}),
            ('listener', {'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'stripAnnotations': 'out'}),
            ('listener', {'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'stripAnnotations': 'in'}),

            ('address', {'prefix': 'closest', 'distribution': 'closest'}),
            ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
            ('address', {'prefix': 'unavailable', 'distribution': 'unavailable'})
        ])
        cls.router = cls.tester.qdrouterd(name, config)
        cls.router.wait_ready()
        cls.address = cls.router.addresses[0]
        cls.closest_count = 1

        cls.no_strip_addr   = cls.router.addresses[1]
        cls.both_strip_addr = cls.router.addresses[2]
        cls.out_strip_addr  = cls.router.addresses[3]
        cls.in_strip_addr   = cls.router.addresses[4]

    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
        p = self.popen(
            ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address, '--indent=-1', '--timeout', str(TIMEOUT)],
            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
            universal_newlines=True)
        out = p.communicate(input)[0]
        try:
            p.teardown()
        except Exception as e:
            raise Exception(out if out else str(e))
        return out


    def test_01_listen_error(self):
        # Make sure a router exits if a initial listener fails, doesn't hang.
        config = Qdrouterd.Config([
            ('router', {'mode': 'standalone', 'id': 'bad'}),
            ('listener', {'port': OneRouterTest.listen_port})])
        r = Qdrouterd(name="expect_fail", config=config, wait=False)
        self.assertEqual(1, r.wait())


    def test_02_pre_settled ( self ):
        addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
        OneRouterTest.closest_count += 1
        test = PreSettled ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    def test_03_multicast_unsettled ( self ) :
        n_receivers = 5
        addr = self.address + '/multicast/1'
        test = MulticastUnsettled ( addr, n_messages = 10, n_receivers = 5 )
        test.run ( )
        self.assertEqual ( None, test.error )

    # DISPATCH-1277. This test will fail with a policy but without the fix in policy_local.py
    # In other words, if the max-frame-size was 2147483647 and not 16384, this
    # test would fail.
    def test_04_disposition_returns_to_closed_connection ( self ) :
        addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
        OneRouterTest.closest_count += 1
        test = DispositionReturnsToClosedConnection ( addr, n_messages = 100 )
        test.run ( )
        self.assertEqual ( None, test.error )


    def test_05_sender_settles_first ( self ) :
        addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
        OneRouterTest.closest_count += 1
        test = SenderSettlesFirst ( addr, n_messages = 100 )
        test.run ( )
        self.assertEqual ( None, test.error )


    def test_06_propagated_disposition ( self ) :
        addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
        OneRouterTest.closest_count += 1
        test = PropagatedDisposition ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    def test_07_unsettled_undeliverable ( self ) :
        addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
        OneRouterTest.closest_count += 1
        test = UsettledUndeliverable ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    def test_08_three_ack ( self ) :
        addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
        OneRouterTest.closest_count += 1
        test = ThreeAck ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    def test_09_message_annotations ( self ) :
        addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
        OneRouterTest.closest_count += 1
        test = MessageAnnotations ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    # Tests stripping of ingress and egress annotations.
    # There is a property in qdrouter.json called stripAnnotations with possible values of ["in", "out", "both", "no"]
    # The default for stripAnnotations is "both" (which means strip annotations on both ingress and egress)
    # This test will test the stripAnnotations = no option - meaning no annotations must be stripped.
    # We will send in a custom annotation and make sure that we get back 3 annotations on the received message
    def test_10_strip_message_annotations_custom(self):
        addr = self.no_strip_addr + "/strip_message_annotations_no_custom/1"
        OneRouterTest.closest_count += 1
        test = StripMessageAnnotationsCustom ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    # stripAnnotations property is set to "no" 
    def test_11_test_strip_message_annotations_no(self):
        addr = self.no_strip_addr + "/strip_message_annotations_no/1"
        test = StripMessageAnnotationsNo ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    # stripAnnotations property is set to "no"
    def test_12_test_strip_message_annotations_no_add_trace(self):
        addr = self.no_strip_addr + "/strip_message_annotations_no_add_trace/1"
        test = StripMessageAnnotationsNoAddTrace ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    # Dont send any pre-existing ingress or trace annotations. Make sure that there 
    # are no outgoing message annotations stripAnnotations property is set to "both".
    # Custom annotations, however, are not stripped.
    def test_13_test_strip_message_annotations_both(self):
        addr = self.both_strip_addr + "/strip_message_annotations_both/1"
        test = StripMessageAnnotationsBoth ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    # Dont send any pre-existing ingress or trace annotations. Make sure that there 
    # are no outgoing message annotations
    # stripAnnotations property is set to "out"
    def test_14_test_strip_message_annotations_out(self):
        addr = self.out_strip_addr + "/strip_message_annotations_out/1"
        test = StripMessageAnnotationsOut ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    # Send in pre-existing trace and ingress and annotations and make sure 
    # that they are not in the outgoing annotations.
    # stripAnnotations property is set to "in"
    def test_15_test_strip_message_annotations_in(self):
        addr = self.in_strip_addr + "/strip_message_annotations_in/1"
        test = StripMessageAnnotationsIn ( addr, n_messages = 10 )
        test.run ( )
        self.assertEqual ( None, test.error )


    def test_16_management(self):
        test = ManagementTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_17_management_get_operations(self):
        test = ManagementGetOperationsTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_18_management_not_implemented(self):
        test = ManagementNotImplemented(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_19_semantics_multicast(self):
        test = SemanticsMulticast(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_20_semantics_closest(self):
        test = SemanticsClosest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_21_semantics_balanced(self):
        test = SemanticsBalanced(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_22_to_override(self):
        test = MessageAnnotaionsPreExistingOverride(self.address)
        test.run()

    def test_23_send_settle_mode_settled(self):
        """
        The receiver sets a snd-settle-mode of settle thus indicating that it wants to receive settled messages from
        the sender. This tests make sure that the delivery that comes to the receiver comes as already settled.
        """
        send_settle_mode_test = SndSettleModeTest(self.address)
        send_settle_mode_test.run()
        self.assertTrue(send_settle_mode_test.message_received)
        self.assertTrue(send_settle_mode_test.delivery_already_settled)

    def test_24_excess_deliveries_released(self):
        """
        Message-route a series of deliveries where the receiver provides credit for a subset and
        once received, closes the link.  The remaining deliveries should be released back to the sender.
        """
        test = ExcessDeliveriesReleasedTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_25_multicast_unsettled(self):
        test = MulticastUnsettledTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_17_multiframe_presettled(self):
        test = MultiframePresettledTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_27_released_vs_modified(self):
        test = ReleasedVsModifiedTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_28_appearance_of_balance(self):
        test = AppearanceOfBalanceTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_29_batched_settlement(self):
        test = BatchedSettlementTest(self.address)
        test.run()
        self.assertEqual(None, test.error)
        self.assertTrue(test.accepted_count_match)

    def test_30_presettled_overflow(self):
        test = PresettledOverflowTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_31_create_unavailable_sender(self):
        test = UnavailableSender(self.address)
        test.run()
        self.assertTrue(test.passed)

    def test_32_create_unavailable_receiver(self):
        test = UnavailableReceiver(self.address)
        test.run()
        self.assertTrue(test.passed)

    def test_33_large_streaming_test(self):
        test = LargeMessageStreamTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_34_reject_coordinator(self):
        test = RejectCoordinatorTest(self.address)
        test.run()
        self.assertTrue(test.passed)

    def test_35_reject_disposition(self):
        test = RejectDispositionTest(self.address)
        test.run()
        self.assertTrue(test.received_error)
        self.assertTrue(test.reject_count_match)

    def test_37_connection_properties_unicode_string(self):
        """
        Tests connection property that is a map of unicode strings and integers
        """
        connection = BlockingConnection(self.router.addresses[0],
                                        timeout=60,
                                        properties=CONNECTION_PROPERTIES_UNICODE_STRING)
        client = SyncRequestResponse(connection)

        node = Node.connect(self.router.addresses[0])

        results = node.query(type='org.apache.qpid.dispatch.connection', attribute_names=[u'properties']).results

        found = False
        for result in results:
            if u'connection' in result[0] and u'int_property' in result[0]:
                found = True
                self.assertEqual(result[0][u'connection'], u'properties')
                self.assertEqual(result[0][u'int_property'], 6451)

        self.assertTrue(found)
        client.connection.close()

    def test_38_connection_properties_symbols(self):
        """
        Tests connection property that is a map of symbols
        """
        connection = BlockingConnection(self.router.addresses[0],
                                        timeout=60,
                                        properties=CONNECTION_PROPERTIES_SYMBOL)
        client = SyncRequestResponse(connection)

        node = Node.connect(self.router.addresses[0])

        results = node.query(type='org.apache.qpid.dispatch.connection', attribute_names=[u'properties']).results

        found = False
        for result in results:
            if u'connection' in result[0]:
                if result[0][u'connection'] == u'properties':
                    found = True
                    break

        self.assertTrue(found)

        client.connection.close()

    def test_40_anonymous_sender_no_receiver(self):
        test = AnonymousSenderNoRecvLargeMessagedTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_41_large_streaming_close_conn_test(self):
        test = LargeMessageStreamCloseConnTest(self.address)
        test.run()
        self.assertEqual(None, test.error)

    def test_42_unsettled_large_message_test(self):
        test = UnsettledLargeMessageTest(self.address, 250)
        test.run()
        self.assertEqual(None, test.error)

    def test_43_dropped_presettled_receiver_stops(self):
        local_node = Node.connect(self.address, timeout=TIMEOUT)
        res = local_node.query('org.apache.qpid.dispatch.router')
        deliveries_ingress = res.attribute_names.index(
            'deliveriesIngress')
        ingress_delivery_count = res.results[0][deliveries_ingress]
        test = DroppedPresettledTest(self.address, 200, ingress_delivery_count)
        test.run()
        self.assertEqual(None, test.error)

    def test_44_delete_connection_fail(self):
        """
        This test creates a blocking connection and tries to update the adminStatus on that connection to "deleted".
        Since the policy associated with this router set allowAdminStatusUpdate as false,
        the update operation will not be permitted.
        """

        # Create a connection with some properties so we can easily identify the connection
        connection = BlockingConnection(self.address,
                                        properties=CONNECTION_PROPERTIES_UNICODE_STRING)
        query_command = 'QUERY --type=connection'
        outputs = json.loads(self.run_qdmanage(query_command))
        identity = None
        passed = False

        for output in outputs:
            if output.get('properties'):
                conn_properties = output['properties']
                # Find the connection that has our properties - CONNECTION_PROPERTIES_UNICODE_STRING
                # Delete that connection and run another qdmanage to see
                # if the connection is gone.
                if conn_properties.get('int_property'):
                    identity = output.get("identity")
                    if identity:
                        update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity
                        try:
                            outputs = json.loads(self.run_qdmanage(update_command))
                        except Exception as e:
                            if "Forbidden" in str(e):
                                passed = True

        # The test has passed since we were not allowed to delete a connection
        # because we do not have the policy permission to do so.
        self.assertTrue(passed)

    def test_45_q2_holdoff_drop_stalled_rx(self):
        """
        Verify that dropping a slow consumer while in Q2 flow control does
        not hang the router
        """
        test = Q2HoldoffDropTest(self.router)
        test.run()
        self.assertEqual(None, test.error)

    def test_48_connection_uptime_last_dlv(self):
        test = ConnectionUptimeLastDlvTest(self.address, "test_48")
        test.run()
        self.assertEqual(None, test.error)


class Entity(object):
    def __init__(self, status_code, status_description, attrs):
        self.status_code        = status_code
        self.status_description = status_description
        self.attrs              = attrs

    def __getattr__(self, key):
        return self.attrs[key]


class RouterProxy(object):
    def __init__(self, reply_addr):
        self.reply_addr = reply_addr

    def response(self, msg):
        ap = msg.properties
        bd = msg.body
        if isinstance(bd, dict) and 'results' in bd and 'attributeNames' in bd:
            ##
            ## This is a query response
            ##
            response = []
            anames = bd['attributeNames']
            for row in bd['results']:
                cols = {}
                for i in range(len(row)):
                    cols[anames[i]] = row[i]
                response.append(Entity(ap['statusCode'], ap['statusDescription'], cols))
            return response

        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)

    def read_address(self, name):
        ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name}
        return Message(properties=ap, reply_to=self.reply_addr)

    def query_addresses(self):
        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'}
        return Message(properties=ap, reply_to=self.reply_addr)

    def query_links(self):
        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.link'}
        return Message(properties=ap, reply_to=self.reply_addr)


class SemanticsClosest(MessagingHandler):
    def __init__(self, address):
        super(SemanticsClosest, self).__init__()
        self.address = address
        self.dest = "closest.1"
        self.timer = None
        self.conn = None
        self.sender = None
        self.receiver_a = None
        self.receiver_b = None
        self.receiver_c = None
        self.num_messages = 100
        self.n_received_a = 0
        self.n_received_b = 0
        self.n_received_c = 0
        self.error = None
        self.n_sent = 0
        self.rx_set = []

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        self.sender = event.container.create_sender(self.conn, self.dest)
        # Receiver on same router as the sender must receive all the messages. The other two
        # receivers are on the other router
        self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")
        self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
        self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")

    def timeout(self):
        self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
                     (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
        self.conn.close()

    def check_if_done(self):
        if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages\
                and self.n_received_b != 0 and self.n_received_c != 0:
            self.rx_set.sort()
            #print self.rx_set
            all_messages_received = True
            for i in range(self.num_messages):
                if not i == self.rx_set[i]:
                    all_messages_received = False

            if all_messages_received:
                self.timer.cancel()
                self.conn.close()

    def on_sendable(self, event):
        if self.n_sent < self.num_messages:
            msg = Message(body={'number': self.n_sent})
            self.sender.send(msg)
            self.n_sent += 1

    def on_message(self, event):
        if event.receiver == self.receiver_a:
            self.n_received_a += 1
            self.rx_set.append(event.message.body['number'])
        if event.receiver == self.receiver_b:
            self.n_received_b += 1
            self.rx_set.append(event.message.body['number'])
        if event.receiver == self.receiver_c:
            self.n_received_c += 1
            self.rx_set.append(event.message.body['number'])

    def on_accepted(self, event):
        self.check_if_done()

    def run(self):
        Container(self).run()


class MessageAnnotaionsPreExistingOverride(MessagingHandler):
    def __init__(self, address):
        super(MessageAnnotaionsPreExistingOverride, self).__init__()
        self.address = address
        self.dest = "toov/1"
        self.error = "Pre-existing x-opt-qd.to has been stripped"
        self.timer = None
        self.conn = None
        self.sender = None
        self.receiver = None
        self.msg_not_sent = True

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        self.sender = event.container.create_sender(self.conn, self.dest)
        self.receiver = event.container.create_receiver(self.conn, self.dest)

    def timeout(self):
        self.error = "Timeout Expired: Sent message not received"
        self.conn.close()

    def bail(self, message):
        self.error = message
        self.conn.close()
        self.timer.cancel()

    def on_sendable(self, event):
        if self.msg_not_sent:
            msg = Message(body={'number': 0})
            msg.annotations = {'x-opt-qd.to': 'toov/1'}
            event.sender.send(msg)
            self.msg_not_sent = False

    def on_message(self, event):
        if 0 == event.message.body['number']:
            ma = event.message.annotations
            if ma['x-opt-qd.to'] == 'toov/1':
                self.bail(None)
            else:
                self.bail("Pre-existing x-opt-qd.to has been stripped")
        else:
            self.bail("body does not match with the sent message body")

    def run(self):
        Container(self).run()


class SemanticsMulticast(MessagingHandler):
    def __init__(self, address):
        """
        Verify that for every 1 unsettled mcast message received, N messages are sent
        out (where N == number of receivers).  Assert that multiple received
        dispositions are summarized to send out one disposition.
        """
        super(SemanticsMulticast, self).__init__(auto_accept=False)
        self.address = address
        self.dest = "multicast.2"
        self.error = None
        self.n_sent = 0
        self.n_settled = 0
        self.count = 3
        self.n_received_a = 0
        self.n_received_b = 0
        self.n_received_c = 0
        self.n_accepts = 0
        self.n_recv_ready = 0
        self.timer = None
        self.conn_1 = None
        self.conn_2 = None
        self.sender = None
        self.receiver_a = None
        self.receiver_b = None
        self.receiver_c = None

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn_1 = event.container.connect(self.address)
        self.conn_2 = event.container.connect(self.address)
        self.receiver_a = event.container.create_receiver(self.conn_2, self.dest, name="A")
        self.receiver_b = event.container.create_receiver(self.conn_1, self.dest, name="B")
        self.receiver_c = event.container.create_receiver(self.conn_2, self.dest, name="C")

    def timeout(self):
        self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
                     (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
        self.conn_1.close()
        self.conn_2.close()

    def check_if_done(self):
        c = self.n_received_a + self.n_received_b + self.n_received_c
        if (c == self.count
                and self.n_received_a == self.n_received_b
                and self.n_received_c == self.n_received_b
                and self.n_accepts == self.n_sent
                and self.n_settled == self.count):
            self.timer.cancel()
            self.conn_1.close()
            self.conn_2.close()

    def on_link_opened(self, event):
        if event.receiver:
            self.n_recv_ready += 1
            if self.n_recv_ready == self.count:
                self.sender = event.container.create_sender(self.conn_1, self.dest)

    def on_sendable(self, event):
        if self.n_sent == 0:
            msg = Message(body="SemanticsMulticast-Test")
            self.sender.send(msg)
            self.n_sent += 1

    def on_message(self, event):
        if event.receiver == self.receiver_a:
            self.n_received_a += 1
        if event.receiver == self.receiver_b:
            self.n_received_b += 1
        if event.receiver == self.receiver_c:
            self.n_received_c += 1
        event.delivery.update(Delivery.ACCEPTED)

    def on_accepted(self, event):
        self.n_accepts += 1
        event.delivery.settle()

    def on_settled(self, event):
        self.n_settled += 1
        self.check_if_done()

    def run(self):
        Container(self).run()


class ManagementNotImplemented(MessagingHandler):
    def __init__(self, address):
        super(ManagementNotImplemented, self).__init__()
        self.address = address
        self.timer = None
        self.conn = None
        self.sender = None
        self.receiver = None
        self.sent_count = 0
        self.error = None

    def timeout(self):
        self.error = "No response received for management request"
        self.conn.close()

    def bail(self, message):
        self.error = message
        self.conn.close()
        self.timer.cancel()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        self.sender = event.container.create_sender(self.conn)
        self.receiver = event.container.create_receiver(self.conn, None, dynamic=True)

    def on_link_opened(self, event):
        if event.receiver == self.receiver:
            request = Message()
            request.address = "amqp:/_local/$management"
            request.reply_to = event.receiver.remote_source.address
            request.properties = {u'type': u'org.amqp.management',
                                  u'name': u'self',
                                  u'operation': u'NOT-IMPL'}
            self.sender.send(request)

    def run(self):
        Container(self).run()

    def on_message(self, event):
        if event.receiver == self.receiver:
            if event.message.properties['statusCode'] == 501:
                self.bail(None)
            else:
                self.bail("The return status code is %s. It should be 501" % str(event.message.properties['statusCode']))


class ManagementGetOperationsTest(MessagingHandler):
    def __init__(self, address):
        super(ManagementGetOperationsTest, self).__init__()
        self.address = address
        self.timer = None
        self.conn = None
        self.sender = None
        self.receiver = None
        self.sent_count = 0
        self.error = None

    def timeout(self):
        self.error = "No response received for management request"
        self.conn.close()

    def bail(self, message):
        self.error = message
        self.conn.close()
        self.timer.cancel()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        self.sender = event.container.create_sender(self.conn)
        self.receiver = event.container.create_receiver(self.conn, None, dynamic=True)

    def on_link_opened(self, event):
        if self.receiver == event.receiver:
            request = Message()
            request.address = "amqp:/_local/$management"
            request.reply_to = self.receiver.remote_source.address
            request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-OPERATIONS'}
            self.sender.send(request)

    def run(self):
        Container(self).run()

    def on_message(self, event):
        if event.receiver == self.receiver:
            if event.message.properties['statusCode'] == 200:
                if 'org.apache.qpid.dispatch.router' in event.message.body.keys():
                    if len(event.message.body.keys()) > 2:
                        self.bail(None)
                    else:
                        self.bail('size of keys in message body less than or equal 2')
                else:
                    self.bail('org.apache.qpid.dispatch.router is not in the keys')
            else:
                self.bail("The return status code is %s. It should be 200" % str(event.message.properties['statusCode']))


class ManagementTest(MessagingHandler):
    def __init__(self, address):
        super(ManagementTest, self).__init__()
        self.address = address
        self.timer = None
        self.conn = None
        self.sender = None
        self.receiver = None
        self.sent_count = 0
        self.msg_not_sent = True
        self.error = None
        self.response1 = False
        self.response2 = False

    def timeout(self):
        if not self.response1:
            self.error = "Incorrect response received for message with correlation id C1"
        if not self.response1:
            self.error = self.error + "and incorrect response received for message with correlation id C2"
        self.conn.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        self.sender = event.container.create_sender(self.conn)
        self.receiver = event.container.create_receiver(self.conn, None, dynamic=True)

    def on_link_opened(self, event):
        if event.receiver == self.receiver:
            request = Message()
            request.address = "amqp:/$management"
            request.reply_to = self.receiver.remote_source.address
            request.correlation_id = "C1"
            request.properties = {u'type': u'org.amqp.management', u'name': u'self', u'operation': u'GET-MGMT-NODES'}
            self.sender.send(request)

            request = Message()
            request.address = "amqp:/_topo/0/QDR.B/$management"
            request.correlation_id = "C2"
            request.reply_to = self.receiver.remote_source.address
            request.properties = {u'type': u'org.amqp.management', u'name': u'self', u'operation': u'GET-MGMT-NODES'}
            self.sender.send(request)

    def on_message(self, event):
        if event.receiver == self.receiver:
            if event.message.correlation_id == "C1":
                if event.message.properties['statusCode'] == 200 and \
                        event.message.properties['statusDescription'] is not None \
                        and event.message.body == []:
                    self.response1 = True
            elif event.message.correlation_id == "C2":
                if event.message.properties['statusCode'] == 200 and \
                        event.message.properties['statusDescription'] is not None \
                        and event.message.body == []:
                    self.response2 = True

        if self.response1 and self.response2:
            self.error = None

        if self.error is None:
            self.timer.cancel()
            self.conn.close()

    def run(self):
        Container(self).run()



class CustomTimeout(object):
    def __init__(self, parent):
        self.parent = parent

    def addr_text(self, addr):
        if not addr:
            return ""
        if addr[0] == 'M':
            return addr[2:]
        else:
            return addr[1:]

    def on_timer_task(self, event):
        local_node = Node.connect(self.parent.address, timeout=TIMEOUT)

        res = local_node.query('org.apache.qpid.dispatch.router.address')
        name = res.attribute_names.index('name')
        found = False
        for results in res.results:
            if "balanced.1" == self.addr_text(results[name]):
                found = True
                break

        if found:
            self.parent.cancel_custom()
            self.parent.create_sender(event)

        else:
            event.reactor.schedule(2, self)


class SemanticsBalanced(MessagingHandler):
    def __init__(self, address):
        super(SemanticsBalanced, self).__init__(auto_accept=False, prefetch=0)
        self.address = address
        self.dest = "balanced.1"
        self.timer = None
        self.conn = None
        self.sender = None
        self.receiver_a = None
        self.receiver_b = None
        self.receiver_c = None
        self.num_messages = 250
        self.n_received_a = 0
        self.n_received_b = 0
        self.n_received_c = 0
        self.error = None
        self.n_sent = 0
        self.rx_set = []
        self.custom_timer = None

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.custom_timer = event.reactor.schedule(2, CustomTimeout(self))
        self.conn = event.container.connect(self.address)

        # This receiver is on the same router as the sender
        self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")

        # These two receivers are connected to a different router than the sender
        self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
        self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")

        self.receiver_a.flow(100)
        self.receiver_b.flow(100)
        self.receiver_c.flow(100)

    def cancel_custom(self):
        self.custom_timer.cancel()

    def create_sender(self, event):
        self.sender = event.container.create_sender(self.conn, self.dest)

    def timeout(self):
        self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
                     (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
        self.conn.close()

    def check_if_done(self):
        if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages and \
                self.n_received_a > 0 and self.n_received_b > 0 and self.n_received_c > 0:
            self.rx_set.sort()
            all_messages_received = True
            for i in range(self.num_messages):
                if not i == self.rx_set[i]:
                    all_messages_received = False

            if all_messages_received:
                self.timer.cancel()
                self.conn.close()

    def on_sendable(self, event):
        if self.n_sent < self.num_messages:
            msg = Message(body={'number': self.n_sent})
            self.sender.send(msg)
            self.n_sent += 1

    def on_message(self, event):
        if event.receiver == self.receiver_a:
            self.n_received_a += 1
            self.rx_set.append(event.message.body['number'])
        elif event.receiver == self.receiver_b:
            self.n_received_b += 1
            self.rx_set.append(event.message.body['number'])
        elif event.receiver == self.receiver_c:
            self.n_received_c += 1
            self.rx_set.append(event.message.body['number'])

        self.check_if_done()

    def run(self):
        Container(self).run()


class Timeout(object):
    def __init__(self, parent):
        self.parent = parent

    def on_timer_task(self, event):
        self.parent.timeout()


class PreSettled ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( PreSettled, self ) . __init__ ( prefetch = n_messages )
        self.addr       = addr
        self.n_messages = n_messages

        self.sender     = None
        self.receiver   = None
        self.n_sent     = 0
        self.n_received = 0
        self.error      = None
        self.test_timer = None


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired: %d messages received, %d expected." % (self.n_received, self.n_messages) )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )
        self.sender   = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver = event.container.create_receiver ( self.send_conn, self.addr )
        self.receiver.flow ( self.n_messages )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            # Presettle the delivery.
            dlv = self.sender.send ( msg )
            dlv.settle()
            self.n_sent += 1


    def on_message ( self, event ) :
        self.n_received += 1
        if self.n_received >= self.n_messages :
            self.bail ( None )


class PresettledCustomTimeout(object):
    def __init__(self, parent):
        self.parent = parent

    def on_timer_task(self, event):
        local_node = Node.connect(self.parent.addr, timeout=TIMEOUT)
        res = local_node.query('org.apache.qpid.dispatch.router')
        deliveries_ingress = res.attribute_names.index(
            'deliveriesIngress')
        ingress_delivery_count = res.results[0][deliveries_ingress]
        self.parent.cancel_custom()

        # Without the fix for DISPATCH--1213  the ingress count will be less than
        # 200 because the sender link has stalled. The q2_holdoff happened
        # and so all the remaining messages are still in the
        # proton buffers.

        if ingress_delivery_count - self.parent.begin_ingress_count > self.parent.n_messages:
            self.parent.bail(None)
        else:
            self.parent.bail("Messages sent to the router is %d, "
                             "Messages processed by the router is %d" %
                             (self.parent.n_messages,
                              ingress_delivery_count - self.parent.begin_ingress_count))


class DroppedPresettledTest(MessagingHandler):
    def __init__(self, addr, n_messages, begin_ingress_count):
        super (DroppedPresettledTest, self).__init__()
        self.addr = addr
        self.n_messages = n_messages
        self.sender = None
        self.receiver = None
        self.sender_conn = None
        self.recv_conn = None
        self.n_sent = 0
        self.n_received = 0
        self.error = None
        self.test_timer = None
        self.max_receive = 10
        self.custom_timer = None
        self.timer = None
        self.begin_ingress_count = begin_ingress_count
        self.str1 = "0123456789abcdef"
        self.msg_str = ""
        for i in range(8192):
            self.msg_str += self.str1

    def run (self):
        Container(self).run()

    def bail(self, travail):
        self.error = travail
        self.sender_conn.close()
        if self.recv_conn:
            self.recv_conn.close()
        self.timer.cancel()

    def timeout(self,):
        self.bail("Timeout Expired: %d messages received, %d expected." %
                  (self.n_received, self.n_messages))

    def on_start (self, event):
        self.sender_conn = event.container.connect(self.addr)
        self.recv_conn = event.container.connect(self.addr)
        self.receiver = event.container.create_receiver(self.recv_conn,
                                                        "test_43")
        self.sender = event.container.create_sender(self.sender_conn,
                                                    "test_43")
        self.timer = event.reactor.schedule(10, Timeout(self))

    def cancel_custom(self):
        self.custom_timer.cancel()

    def on_sendable(self, event):
        while self.n_sent < self.n_messages:
            msg = Message(id=(self.n_sent + 1),
                          body={'sequence': (self.n_sent + 1),
                                'msg_str': self.msg_str})
            # Presettle the delivery.
            dlv = self.sender.send (msg)
            dlv.settle()
            self.n_sent += 1

    def on_message(self, event):
        self.n_received += 1
        if self.n_received == self.max_receive:
            # Receiver bails after receiving max_receive messages.
            self.receiver.close()
            self.recv_conn.close()

            # The sender is only sending 200 large messages which is less
            # that the initial credit of 250 that the router gives.
            # Lets do a qdstat to find out if all 200 messages is handled
            # by the router.
            self.custom_timer = event.reactor.schedule(1,
                                                       PresettledCustomTimeout(
                                                           self))

class MulticastUnsettled ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages,
                   n_receivers
                 ) :
        super ( MulticastUnsettled, self ) . __init__ (auto_accept=False, prefetch=n_messages)
        self.addr        = addr
        self.n_messages  = n_messages
        self.n_receivers = n_receivers

        self.sender     = None
        self.receivers  = list ( )
        self.n_sent     = 0
        self.n_received = list ( )
        self.error      = None
        self.test_timer = None
        self.bailing    = False


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.recv_conn = event.container.connect ( self.addr )
        for i in range ( self.n_receivers ) :
            rcvr = event.container.create_receiver ( self.recv_conn, self.addr, name = "receiver_" + str(i) )
            rcvr.flow ( self.n_messages )

        self.test_timer = event.reactor.schedule ( 15, MultiTimeout(self, "test") )

    def on_link_opened(self, event):
        if event.receiver:
            self.receivers.append(event.receiver)
            self.n_received.append(0)
            # start the sender once all receivers links are up
            if len(self.receivers) == self.n_receivers:
                self.send_conn = event.container.connect(self.addr)
                self.sender = event.container.create_sender(self.send_conn, self.addr)

    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            for i in range ( self.n_messages ) :
                msg = Message ( body = i )
                # The sender does not settle, but the
                # receivers will..
                self.sender.send ( msg )
                self.n_sent += 1

    def on_message ( self, event ) :
        if self.bailing :
            return
        event.delivery.settle()
        for i in range ( self.n_receivers ) :
            if event.receiver == self.receivers [ i ] :
                # Body conetnts of the messages count from 0 ... n,
                # so the contents of this message should be same as
                # the current number of messages received by this receiver.
                if self.n_received [ i ] != event.message.body :
                    self.bail ( "out of order or missed message: receiver %d got %d instead of %d" %
                                ( i, event.message.body, self.n_received [ i ] )
                              )
                    return
                self.n_received [ i ] += 1
                self.check_n_received ( )


    def check_n_received ( self ) :
        for i in range ( self.n_receivers ) :
            if self.n_received [ i ] < self.n_messages :
                return
        # All messages have been received by all receivers.
        self.bail ( None )




class DispositionReturnsToClosedConnection ( MessagingHandler ) :

    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( DispositionReturnsToClosedConnection, self ) . __init__ ( prefetch = n_messages )
        self.addr       = addr
        self.n_messages = n_messages

        self.n_sent     = 0
        self.n_received = 0


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.test_timer.cancel ( )
        self.error = travail
        if self.send_conn :
            self.send_conn.close ( )
        self.recv_conn.close ( )


    def timeout ( self, name ) :
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )
        self.sender   = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer = event.reactor.schedule ( 15, MultiTimeout ( self, "test" ) )


    def on_sendable ( self, event ) :

        if not self.send_conn :
            return

        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            self.sender.send ( msg )
            self.n_sent += 1

        # Immediately upon finishing sending all the messages, the 
        # sender closes its connection, so that when the dispositions
        # try to come back they will find no one who cares.
        # The only problem I can directly detect here is a test 
        # timeout. And, indirectly, we are making sure that the router
        # does not blow sky high.
        if self.n_sent >= self.n_messages :
            self.send_conn.close()
            self.send_conn = None


    # On the receiver side, we keep accepting and settling 
    # messages, tragically unaware that no one cares.
    def on_message ( self, event ) :
        event.delivery.update ( Delivery.ACCEPTED )
        event.delivery.settle ( )
        self.n_received += 1
        if self.n_received >= self.n_messages :
            self.bail ( None )





class SenderSettlesFirst ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( SenderSettlesFirst, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer = None
        self.sender     = None
        self.receiver   = None
        self.n_sent     = 0
        self.n_received = 0


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            # Settle the delivery immediately after sending.
            dlv = self.sender.send ( msg )
            dlv.settle()
            self.n_sent += 1


    def on_message ( self, event ) :
        self.n_received += 1
        event.delivery.settle ( )
        if self.n_received >= self.n_messages :
            self.bail ( None )




class PropagatedDisposition ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( PropagatedDisposition, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer = None
        self.sender     = None
        self.receiver   = None
        self.n_sent     = 0
        self.n_received = 0
        self.n_accepted = 0
        self.bailing    = False


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    # Sender Side ================================================
    def on_sendable ( self, event ) :
        if self.bailing :
            return
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            dlv = self.sender.send ( msg )
            if dlv.remote_state != 0 :
                self.bail ( "remote state nonzero on send." )
                break
            if not dlv.pending :
                self.bail ( "dlv not pending immediately after send." )
                break

            self.n_sent += 1


    def on_accepted ( self, event ) :
        if self.bailing :
            return
        dlv = event.delivery
        if dlv.pending :
            self.bail ( "Delivery still pending after accepted." )
            return
        if dlv.remote_state != Delivery.ACCEPTED :
            self.bail ( "Delivery remote state is not ACCEPTED after accept." )
            return
        self.n_accepted += 1
        if self.n_accepted >= self.n_messages :
            # Success!
            self.bail ( None )


    # Receiver Side ================================================
    def on_message ( self, event ) :
        if self.bailing :
            return
        self.n_received += 1
        dlv = event.delivery
        if dlv.pending :
            self.bail ( 'Delivery still pending at receiver.' )
            return
        if dlv.local_state != 0 :
            self.bail ( 'At receiver: delivery local state nonzero at receiver before accept.' )
            return
        dlv.update ( Delivery.ACCEPTED )




class UsettledUndeliverable ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( UsettledUndeliverable, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer = None
        self.sender     = None
        self.n_sent     = 0
        self.n_received = 0
        self.bailing    = False


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        if self.n_sent > 0 :
            self.bail ( "Messages sent with no receiver." )
        else :
            self.bail ( None )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.sender    = event.container.create_sender ( self.send_conn, self.addr )
        # Uh-oh. We are not creating a receiver! 
        self.test_timer = event.reactor.schedule ( 5, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            msg = Message ( body = self.n_sent )
            dlv = self.sender.send ( msg )
            dlv.settle()
            self.n_sent += 1


    def on_message ( self, event ) :
        self.n_received += 1




class ThreeAck ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( ThreeAck, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer = None
        self.sender     = None
        self.receiver   = None
        self.n_sent     = 0
        self.n_received = 0
        self.n_accepted = 0
        self.bailing    = False
        self.tmp_dlv    = None


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    # Sender Side ================================================
    def on_sendable ( self, event ) :
        if self.bailing :
            return
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            dlv = self.sender.send ( msg )

            self.n_sent += 1


    def on_accepted ( self, event ) :
        if self.bailing :
            return
        dlv = event.delivery
        if dlv.remote_state != Delivery.ACCEPTED :
            self.bail ( "Delivery remote state is not ACCEPTED in on_accepted." )
            return
        # When sender knows that receiver has accepted, we settle.
        # That's two-ack.
        dlv.settle()
        self.n_accepted += 1
        if self.n_accepted >= self.n_messages :
            # Success!
            self.bail ( None )


    # Receiver Side ================================================
    def on_message ( self, event ) :
        if self.bailing :
            return
        dlv = event.delivery
        dlv.update ( Delivery.ACCEPTED )
        if event.message.body != self.n_received :
            self.bail ( "out-of-order message" )
            return
        self.n_received += 1
        if self.tmp_dlv == None :
            self.tmp_dlv = dlv

    # We have no way, on receiver side, of tracking when sender settles.
    # See PROTON-395 .




class MessageAnnotations ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( MessageAnnotations, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer = None
        self.sender     = None
        self.receiver   = None
        self.n_sent     = 0
        self.n_received = 0
        self.bailing    = False


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :

        if event.sender.credit < 1 :
            return
        # No added annotations.
        msg = Message ( body = self.n_sent )
        self.n_sent += 1
        self.sender.send ( msg )

        # Add an annotation.
        msg = Message ( body = self.n_sent )
        self.n_sent += 1
        msg.annotations = { 'x-opt-qd.ingress': 'i_changed_the_annotation' }
        self.sender.send ( msg )

        # Try to supply an invalid type for trace.
        msg = Message ( body = self.n_sent )
        self.n_sent += 1
        msg.annotations = { 'x-opt-qd.trace' : 45 }
        self.sender.send ( msg )

        # Add a value to the trace list.
        msg = Message ( body = self.n_sent )
        self.n_sent += 1
        msg.annotations = { 'x-opt-qd.trace' : [ '0/first-hop' ] }
        self.sender.send ( msg )


    def on_message ( self, event ) :
        ingress_router_name = '0/QDR'
        self.n_received += 1
        if self.n_received >= self.n_messages :
            self.bail ( None )
            return

        annotations = event.message.annotations

        if self.n_received == 1 :
            if annotations [ 'x-opt-qd.ingress' ] != ingress_router_name :
                self.bail ( 'Bad ingress router name on msg %d' % self.n_received )
                return
            if annotations [ 'x-opt-qd.trace' ] != [ ingress_router_name ] :
                self.bail ( 'Bad trace on msg %d.' % self.n_received )
                return

        elif self.n_received == 2 :
            if annotations [ 'x-opt-qd.ingress' ] != 'i_changed_the_annotation' :
                self.bail ( 'Bad ingress router name on msg %d' % self.n_received )
                return
            if annotations [ 'x-opt-qd.trace' ] != [ ingress_router_name ] :
                self.bail ( 'Bad trace on msg %d .' % self.n_received )
                return

        elif self.n_received == 3 :
            # The invalid type for trace has no effect.
            if annotations [ 'x-opt-qd.ingress' ] != ingress_router_name :
                self.bail ( 'Bad ingress router name on msg %d ' % self.n_received )
                return
            if annotations [ 'x-opt-qd.trace' ] != [ ingress_router_name ] :
                self.bail ( 'Bad trace on msg %d' % self.n_received )
                return

        elif self.n_received == 4 :
            if annotations [ 'x-opt-qd.ingress' ] != ingress_router_name :
                self.bail ( 'Bad ingress router name on msg %d ' % self.n_received )
                return
            # The sender prepended a value to the trace list.
            if annotations [ 'x-opt-qd.trace' ] != [ '0/first-hop', ingress_router_name ] :
                self.bail ( 'Bad trace on msg %d' % self.n_received )
                return
            # success
            self.bail ( None )




class StripMessageAnnotationsCustom ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( StripMessageAnnotationsCustom, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer  = None
        self.sender      = None
        self.receiver    = None
        self.n_sent      = 0
        self.n_received  = 0


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            self.n_sent += 1
            msg.annotations = { 'custom-annotation' : '1/Custom_Annotation' }

            self.sender.send ( msg )


    def on_message ( self, event ) :
        self.n_received += 1
        if not 'custom-annotation' in event.message.annotations :
            self.bail ( 'custom annotation not found' )
            return
        if event.message.annotations [ 'custom-annotation'] != '1/Custom_Annotation' :
            self.bail ( 'custom annotation bad value' )
            return
        if self.n_received >= self.n_messages :
            # success
            self.bail ( None )



class StripMessageAnnotationsNo ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( StripMessageAnnotationsNo, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer  = None
        self.sender      = None
        self.receiver    = None
        self.n_sent      = 0
        self.n_received  = 0


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            self.n_sent += 1
            # This test has no added annotations.
            # The receiver should get the expected standard annotations anyway,
            # because the address we are using has 'stripAnnotations' set to 'no'.
            msg.annotations = { }
            self.sender.send ( msg )


    def on_message ( self, event ) :
        self.n_received += 1

        if event.message.annotations [ 'x-opt-qd.ingress' ] != '0/QDR' :
            self.bail ( "x-opt-qd.ingress annotation has been stripped!" )
            return

        if event.message.annotations [ 'x-opt-qd.trace' ] != [ '0/QDR' ] :
            self.bail ( "x-opt-qd.trace annotations has been stripped!" )
            return

        if self.n_received >= self.n_messages :
            # success
            self.bail ( None )



class StripMessageAnnotationsNoAddTrace ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( StripMessageAnnotationsNoAddTrace, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer  = None
        self.sender      = None
        self.receiver    = None
        self.n_sent      = 0
        self.n_received  = 0


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            annotations = {'Canis_meus' : 'id_comedit',
                           'x-opt-qd.ingress': 'ingress-router',
                           'x-opt-qd.trace': ['0/QDR.1']
            }
            self.n_sent += 1
            # This test has no added annotations.
            # The receiver should get the expected standard annotations anyway,
            # because the address we are using has 'stripAnnotations' set to 'no'.
            msg.annotations = annotations
            self.sender.send ( msg )


    def on_message ( self, event ) :
        self.n_received += 1

        notes = event.message.annotations

        if not isinstance(notes, dict):
            self.bail("annotations are not a dictionary")
            return

        # No annotations should get stripped -- neither the
        # ones that the router adds, not the custome one that
        # I added.
        if not 'x-opt-qd.ingress' in notes :
            self.bail ( 'x-opt-qd.ingress annotation missing' )
            return
        if not 'x-opt-qd.trace' in notes :
            self.bail ( 'x-opt-qd.trace annotation missing' )
            return
        if not 'Canis_meus' in notes :
            self.bail ( 'Canis_meus annotation missing' )
            return

        if notes [ 'x-opt-qd.ingress' ] != 'ingress-router' :
            self.bail ( "x-opt-qd.ingress bad value" )
            return
        if notes [ 'x-opt-qd.trace' ] != ['0/QDR.1', '0/QDR'] :
            self.bail ( "x-opt-qd.trace bad value" )
            return
        if notes [ 'Canis_meus' ] != 'id_comedit' :
            self.bail ( "Canis_meus bad value" )
            return

        if self.n_received >= self.n_messages :
            # success
            self.bail ( None )



class StripMessageAnnotationsBoth ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( StripMessageAnnotationsBoth, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer  = None
        self.sender      = None
        self.receiver    = None
        self.n_sent      = 0
        self.n_received  = 0


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            annotations = { 'Canis_meus' : 'id_comedit',
                            'x-opt-qd.ingress': 'ingress-router',
                            'x-opt-qd.trace': ['0/QDR.1'],
                          }
            self.n_sent += 1
            # This test has no added annotations.
            # The receiver should get the expected standard annotations anyway,
            # because the address we are using has 'stripAnnotations' set to 'no'.
            msg.annotations = annotations
            self.sender.send ( msg )


    def on_message ( self, event ) :
        self.n_received += 1

        # The annotations that the router adds should get stripped,
        # but not the custom one that I added.
        notes = event.message.annotations
        if 'x-opt-qd.ingress' in notes :
            self.bail ( 'x-opt-qd.ingress annotation not stripped' )
            return
        if 'x-opt-qd.trace' in notes :
            self.bail ( 'x-opt-qd.trace annotation not stripped' )
            return
        if not 'Canis_meus' in notes :
            self.bail ( 'Canis_meus annotation missing' )
            return

        if notes [ 'Canis_meus' ] != 'id_comedit' :
            self.bail ( "Canis_meus bad value" )
            return

        if self.n_received >= self.n_messages :
            # success
            self.bail ( None )


class StripMessageAnnotationsOut ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( StripMessageAnnotationsOut, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer  = None
        self.sender      = None
        self.receiver    = None
        self.n_sent      = 0
        self.n_received  = 0


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            self.n_sent += 1
            # This test has no added annotations.
            # The receiver should get the expected standard annotations anyway,
            # because the address we are using has 'stripAnnotations' set to 'no'.
            self.sender.send ( msg )


    def on_message ( self, event ) :
        self.n_received += 1

        # The annotations that the router routinely adds 
        # should all get stripped,
        if event.message.annotations != None :
            self.bail ( "An annotation was not stripped in egress message." )
            return

        if self.n_received >= self.n_messages :
            # success
            self.bail ( None )




class StripMessageAnnotationsIn ( MessagingHandler ) :
    def __init__ ( self,
                   addr,
                   n_messages
                 ) :
        super ( StripMessageAnnotationsIn, self ) . __init__ ( prefetch = n_messages )
        self.addr        = addr
        self.n_messages  = n_messages

        self.test_timer  = None
        self.sender      = None
        self.receiver    = None
        self.n_sent      = 0
        self.n_received  = 0


    def run ( self ) :
        Container(self).run()


    def bail ( self, travail ) :
        self.bailing = True
        self.error = travail
        self.send_conn.close ( )
        self.recv_conn.close ( )
        self.test_timer.cancel ( )


    def timeout ( self, name ):
        self.bail ( "Timeout Expired" )


    def on_start ( self, event ):
        self.send_conn = event.container.connect ( self.addr )
        self.recv_conn = event.container.connect ( self.addr )

        self.sender      = event.container.create_sender   ( self.send_conn, self.addr )
        self.receiver    = event.container.create_receiver ( self.recv_conn, self.addr )
        self.test_timer  = event.reactor.schedule ( 15, MultiTimeout(self, "test") )


    def on_sendable ( self, event ) :
        while self.n_sent < self.n_messages :
            if event.sender.credit < 1 :
                break
            msg = Message ( body = self.n_sent )
            # Attach some standard annotations to the message.
            # These are ingress annotations, and should get stripped.
            # These annotation-keys will then get values assigned by
            # the router.
            notes = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.1']}
            self.sender.send ( msg )
            self.n_sent += 1


    def on_message ( self, event ) :
        self.n_received += 1

        if event.message.annotations [ 'x-opt-qd.ingress' ] == 'ingress-router' :
            self.bail ( "x-opt-qd.ingress value was not stripped." )
            return

        if event.message.annotations [ 'x-opt-qd.trace' ] == ['0/QDR.1'] :
            self.bail ( "x-opt-qd.trace value was not stripped." )
            return

        if self.n_received >= self.n_messages :
            # success
            self.bail ( None )






HELLO_WORLD = "Hello World!"

class SndSettleModeTest(MessagingHandler):
    def __init__(self, address):
        super(SndSettleModeTest, self).__init__()
        self.address = address
        self.sender = None
        self.receiver = None
        self.message_received = False
        self.delivery_already_settled = False

    def on_start(self, event):
        conn = event.container.connect(self.address)
        # The receiver sets link.snd_settle_mode = Link.SND_SETTLED. It wants to receive settled messages
        self.receiver = event.container.create_receiver(conn, "org/apache/dev", options=AtMostOnce())

        # With AtLeastOnce, the sender will not settle.
        self.sender = event.container.create_sender(conn, "org/apache/dev", options=AtLeastOnce())

    def on_sendable(self, event):
        msg = Message(body=HELLO_WORLD)
        event.sender.send(msg)
        event.sender.close()

    def on_message(self, event):
        self.delivery_already_settled = event.delivery.settled
        if HELLO_WORLD == event.message.body:
            self.message_received = True
        else:
            self.message_received = False
        event.connection.close()

    def run(self):
        Container(self).run()


class ExcessDeliveriesReleasedTest(MessagingHandler):
    def __init__(self, address):
        super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
        self.address = address
        self.dest = "closest.EDRtest"
        self.error = None
        self.sender = None
        self.receiver = None
        self.n_sent     = 0
        self.n_received = 0
        self.n_accepted = 0
        self.n_released = 0

    def on_start(self, event):
        conn = event.container.connect(self.address)
        self.sender   = event.container.create_sender(conn, self.dest)
        self.receiver = event.container.create_receiver(conn, self.dest)
        self.receiver.flow(6)

    def on_sendable(self, event):
        for i in range(10 - self.n_sent):
            msg = Message(body=i)
            event.sender.send(msg)
            self.n_sent += 1

    def on_accepted(self, event):
        self.n_accepted += 1

    def on_released(self, event):
        self.n_released += 1
        if self.n_released == 4:
            if self.n_accepted != 6:
                self.error = "Expected 6 accepted, got %d" % self.n_accepted
            if self.n_received != 6:
                self.error = "Expected 6 received, got %d" % self.n_received

            event.connection.close()

    def on_message(self, event):
        self.n_received += 1
        if self.n_received == 6:
            self.receiver.close()

    def run(self):
        Container(self).run()

class UnavailableBase(MessagingHandler):
    def __init__(self, address):
        super(UnavailableBase, self).__init__()
        self.address = address
        self.dest = "unavailable"
        self.conn = None
        self.sender = None
        self.receiver = None
        self.link_error = False
        self.link_closed = False
        self.passed = False
        self.timer = None
        self.link_name = "test_link"

    def check_if_done(self):
        if self.link_error and self.link_closed:
            self.passed = True
            self.conn.close()
            self.timer.cancel()

    def on_link_error(self, event):
        link = event.link
        if event.link.name == self.link_name and link.remote_condition.description \
                == "Node not found":
            self.link_error = True
        self.check_if_done()

    def on_link_remote_close(self, event):
        if event.link.name == self.link_name:
            self.link_closed = True
            self.check_if_done()

    def run(self):
        Container(self).run()

class UnavailableSender(UnavailableBase):
    def __init__(self, address):
        super(UnavailableSender, self).__init__(address)

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        # Creating a sender to an address with unavailable distribution
        # The router will not allow this link to be established. It will close the link with an error of
        # "Node not found"
        self.sender = event.container.create_sender(self.conn, self.dest, name=self.link_name)

class UnavailableReceiver(UnavailableBase):
    def __init__(self, address):
        super(UnavailableReceiver, self).__init__(address)

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        # Creating a receiver to an address with unavailable distribution
        # The router will not allow this link to be established. It will close the link with an error of
        # "Node not found"
        self.receiver = event.container.create_receiver(self.conn, self.dest, name=self.link_name)

class MulticastUnsettledTest(MessagingHandler):
    """
    Send N unsettled multicast messages to 2 receivers.  Ensure sender is
    notified of settlement and disposition changes from the receivers.
    """
    def __init__(self, address):
        super(MulticastUnsettledTest, self).__init__(auto_accept=False, prefetch=0)
        self.address = address
        self.dest = "multicast.MUtest"
        self.error = None
        self.count      = 10
        self.n_sent     = 0
        self.n_received = 0
        self.n_accepted = 0
        self.n_receivers = 0

    def check_if_done(self):
        if self.n_received == self.count * 2 and self.n_accepted == self.count:
            self.timer.cancel()
            self.conn.close()

    def timeout(self):
        self.error = "Timeout Expired: sent=%d, received=%d, accepted=%d" % (self.n_sent, self.n_received, self.n_accepted)
        self.conn.close()

    def on_start(self, event):
        self.timer     = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn      = event.container.connect(self.address)
        self.receiver1 = event.container.create_receiver(self.conn, self.dest,
                                                         name="A",
                                                         options=AtLeastOnce())
        self.receiver2 = event.container.create_receiver(self.conn, self.dest,
                                                         name="B",
                                                         options=AtLeastOnce());
        self.receiver1.flow(self.count)
        self.receiver2.flow(self.count)

    def on_link_opened(self, event):
        if event.receiver:
            self.n_receivers += 1
            # start the sender once all receivers links are up
            if self.n_receivers == 2:
                self.sender = event.container.create_sender(self.conn, self.dest,
                                                            options=AtLeastOnce())

    def on_sendable(self, event):
        for i in range(self.count - self.n_sent):
            msg = Message(body=i)
            event.sender.send(msg)
            self.n_sent += 1

    def on_accepted(self, event):
        self.n_accepted += 1
        self.check_if_done()

    def on_message(self, event):
        if event.delivery.settled:
            self.error = "Received settled delivery"
        event.delivery.update(Delivery.ACCEPTED)
        event.delivery.settle()
        self.n_received += 1
        self.check_if_done()

    def run(self):
        Container(self).run()


class LargeMessageStreamCloseConnTest(MessagingHandler):
    def __init__(self, address):
        super(LargeMessageStreamCloseConnTest, self).__init__()
        self.address = address
        self.dest = "LargeMessageStreamCloseConnTest"
        self.error = None
        self.timer = None
        self.sender_conn = None
        self.receiver_conn = None
        self.sender = None
        self.receiver = None
        self.body = ""
        self.aborted = False
        for i in range(20000):
            self.body += "0123456789101112131415"

    def timeout(self):
        if self.aborted:
            self.error = "Message has been aborted. Test failed"
        else:
            self.error = "Message not received. test failed"
        self.receiver_conn.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.sender_conn = event.container.connect(self.address)
        self.receiver_conn = event.container.connect(self.address)
        self.sender = event.container.create_sender(self.sender_conn, self.dest)
        self.receiver = event.container.create_receiver(self.receiver_conn,
                                                        self.dest, name="A")

    def on_sendable(self, event):
        msg = Message(body=self.body)
        # send(msg) calls the stream function which streams data
        # from sender to the router
        event.sender.send(msg)

        # Close the connection immediately after sending the message
        # Without the fix for DISPATCH-1085, this test will fail
        # one in five times with an abort
        # With the fix in place, this test will never fail (the
        # on_aborted will never be called).
        self.sender_conn.close()

    def on_message(self, event):
        self.timer.cancel()
        self.receiver_conn.close()

    def on_aborted(self, event):
        self.aborted = True
        self.timer.cancel()
        self.timeout()

    def run(self):
        Container(self).run()


class LargeMessageStreamTest(MessagingHandler):
    def __init__(self, address):
        super(LargeMessageStreamTest, self).__init__()
        self.address = address
        self.dest = "LargeMessageStreamTest"
        self.error = None
        self.count = 10
        self.n_sent = 0
        self.timer = None
        self.conn = None
        self.sender = None
        self.receiver = None
        self.n_received = 0
        self.body = ""
        for i in range(10000):
            self.body += "0123456789101112131415"

    def check_if_done(self):
        if self.n_received == self.count:
            self.timer.cancel()
            self.conn.close()

    def timeout(self):
        self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
        self.conn.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        self.sender = event.container.create_sender(self.conn, self.dest)
        self.receiver = event.container.create_receiver(self.conn, self.dest, name="A")
        self.receiver.flow(self.count)

    def on_sendable(self, event):
        for i in range(self.count):
            msg = Message(body=self.body)
            # send(msg) calls the stream function which streams data from sender to the router
            event.sender.send(msg)
            self.n_sent += 1

    def on_message(self, event):
        self.n_received += 1
        self.check_if_done()

    def run(self):
        Container(self).run()


class MultiframePresettledTest(MessagingHandler):
    def __init__(self, address):
        super(MultiframePresettledTest, self).__init__(prefetch=0)
        self.address = address
        self.dest = "closest.MFPtest"
        self.error = None
        self.count      = 10
        self.n_sent     = 0
        self.n_received = 0

        self.body = ""
        for i in range(10000):
            self.body += "0123456789"

    def check_if_done(self):
        if self.n_received == self.count:
            self.timer.cancel()
            self.conn.close()

    def timeout(self):
        self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
        self.conn.close()

    def on_start(self, event):
        self.timer     = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn      = event.container.connect(self.address)
        self.sender    = event.container.create_sender(self.conn, self.dest)
        self.receiver  = event.container.create_receiver(self.conn, self.dest, name="A")
        self.receiver.flow(self.count)

    def on_sendable(self, event):
        for i in range(self.count - self.n_sent):
            msg = Message(body=self.body)
            dlv = event.sender.send(msg)
            dlv.settle()
            self.n_sent += 1

    def on_message(self, event):
        if not event.delivery.settled:
            self.error = "Received unsettled delivery"
        self.n_received += 1
        self.check_if_done()

    def run(self):
        Container(self).run()


class UptimeLastDlvChecker(object):
    def __init__(self, parent, lastDlv=None, uptime=0):
        self.parent = parent
        self.uptime = uptime
        self.lastDlv = lastDlv
        self.expected_num_connections = 2
        self.num_connections = 0

    def on_timer_task(self, event):
        local_node = Node.connect(self.parent.address, timeout=TIMEOUT)
        result = local_node.query('org.apache.qpid.dispatch.connection')
        container_id_index = result.attribute_names.index('container')
        uptime_seconds_index = result.attribute_names.index('uptimeSeconds')
        last_dlv_seconds_index = result.attribute_names.index('lastDlvSeconds')
        for res in result.results:
            container_id = res[container_id_index]

            # We only care if the container_id is "UPTIME-TEST"
            if container_id == self.parent.container_id:
                uptime_seconds = res[uptime_seconds_index]
                if self.uptime != 0 and uptime_seconds < self.uptime:
                    self.parent.error = "The connection uptime should be greater than or equal to %d seconds but instead is %d seconds" % (self.uptime, uptime_seconds)
                last_dlv_seconds = res[last_dlv_seconds_index]
                if self.lastDlv is None:
                    if last_dlv_seconds is not None:
                        self.parent.error = "Expected lastDlvSeconds to be empty"
                else:
                    if not last_dlv_seconds >= self.lastDlv:
                        self.parent.error = "Connection lastDeliverySeconds must be greater than or equal to $d but is %d" % (self.lastDlv, last_dlv_seconds)
                    else:
                        self.parent.success = True
                self.num_connections += 1

        if self.expected_num_connections != self.num_connections:
            self.parent.error = "Number of client connections expected=%d, but got %d" % (self.expected_num_connections, self.num_connections)

        self.parent.cancel_custom()


class ConnectionUptimeLastDlvTest(MessagingHandler):
    def __init__(self, address, dest):
        super(ConnectionUptimeLastDlvTest, self).__init__()
        self.timer = None
        self.sender_conn = None
        self.receiver_conn = None
        self.address = address
        self.sender = None
        self.receiver = None
        self.error = None
        self.custom_timer = None
        self.container_id = "UPTIME-TEST"
        self.dest = dest
        self.reactor = None
        self.success = False

    def cancel_custom(self):
        self.custom_timer.cancel()
        if self.error or self.success:
            self.timer.cancel()
            self.sender_conn.close()
            self.receiver_conn.close()
        else:
            msg = Message(body=self.container_id)
            self.sender.send(msg)

            # We have now sent a message that the router must have sent to the
            # receiver. We will wait for 2 seconds and once again check
            # uptime and lastDlv
            self.custom_timer = self.reactor.schedule(2, UptimeLastDlvChecker(self, uptime=7, lastDlv=2))

    def timeout(self):
        self.error = "Timeout Expired:, Test took too long to execute. "
        self.sender_conn.close()
        self.receiver_conn.close()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.sender_conn = event.container.connect(self.address)
        self.receiver_conn = event.container.connect(self.address)

        # Let's create a sender and receiver but not send any messages.
        self.sender = event.container.create_sender(self.sender_conn, self.dest)
        self.receiver = event.container.create_receiver(self.receiver_conn, self.dest)

        # Execute a management query for connections after 5 seconds
        # This will help us check the uptime and lastDlv time
        # No deliveries were sent on any link yet, so the lastDlv must be "-"
        self.reactor = event.reactor
        self.custom_timer = event.reactor.schedule(5, UptimeLastDlvChecker(self, uptime=5, lastDlv=None))

    def run(self):
        container = Container(self)
        container.container_id = self.container_id
        container.run()


class AnonymousSenderNoRecvLargeMessagedTest(MessagingHandler):
    def __init__(self, address):
        super(AnonymousSenderNoRecvLargeMessagedTest, self).__init__(auto_accept=False)
        self.timer = None
        self.conn = None
        self.sender = None
        self.address = address
        self.released = False
        self.error = None
        self.body = ""
        for i in range(20000):
            self.body += "0123456789101112131415"

    def timeout(self):
        self.error = "Timeout Expired:, delivery not released. "
        self.conn.close()

    def check_if_done(self):
        if self.released:
            self.sender.close()
            self.conn.close()
            self.timer.cancel()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn = event.container.connect(self.address)
        # This sender is an anonymous sender
        self.sender = event.container.create_sender(self.conn)

    def on_sendable(self, event):
        msg = Message(body=self.body, address="someaddress")
        # send(msg) calls the stream function which streams data from sender to the router
        event.sender.send(msg)

    def on_released(self, event):
        self.released = True
        self.check_if_done()

    def run(self):
        Container(self).run()


class ReleasedVsModifiedTest(MessagingHandler):
    def __init__(self, address):
        super(ReleasedVsModifiedTest, self).__init__(prefetch=0, auto_accept=False)
        self.address = address
        self.dest = "closest.RVMtest"
        self.error = None
        self.count      = 10
        self.accept     = 6
        self.n_sent     = 0
        self.n_received = 0
        self.n_released = 0
        self.n_modified = 0
        self.node_modified_at_start = 0

    def get_modified_deliveries ( self ) :
        local_node = Node.connect(self.address, timeout=TIMEOUT)
        outs = local_node.query(type='org.apache.qpid.dispatch.router')
        pos = outs.attribute_names.index("modifiedDeliveries")
        results = outs.results[0]
        n_modified_deliveries = results [ pos ]
        return n_modified_deliveries


    def check_if_done(self):
        if self.n_received == self.accept and self.n_released == self.count - self.accept and self.n_modified == self.accept:
            node_modified_now = self.get_modified_deliveries ( )
            this_test_modified_deliveries = node_modified_now - self.node_modified_at_start
            if this_test_modified_deliveries == self.accept:
                self.timer.cancel()
                self.conn.close()

    def timeout(self):
        self.error = "Timeout Expired: sent=%d, received=%d, released=%d, modified=%d" % \
                     (self.n_sent, self.n_received, self.n_released, self.n_modified)
        self.conn.close()

    def on_start(self, event):
        self.timer     = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn      = event.container.connect(self.address)
        self.sender    = event.container.create_sender(self.conn, self.dest)
        self.receiver  = event.container.create_receiver(self.conn, self.dest, name="A")
        self.receiver.flow(self.accept)
        self.node_modified_at_start = self.get_modified_deliveries ( )

    def on_sendable(self, event):
        for i in range(self.count - self.n_sent):
            msg = Message(body="RvM-Test")
            event.sender.send(msg)
            self.n_sent += 1

    def on_message(self, event):
        self.n_received += 1
        if self.n_received == self.accept:
            self.receiver.close()

    def on_released(self, event):
        if event.delivery.remote_state == Delivery.MODIFIED:
            self.n_modified += 1
        else:
            self.n_released += 1
        self.check_if_done()

    def run(self):
        Container(self).run()


class AppearanceOfBalanceTest(MessagingHandler):
    def __init__(self, address):
        super(AppearanceOfBalanceTest, self).__init__()
        self.address = address
        self.dest = "balanced.AppearanceTest"
        self.error = None
        self.count        = 9
        self.n_sent       = 0
        self.n_received_a = 0
        self.n_received_b = 0
        self.n_received_c = 0

    def check_if_done(self):
        if self.n_received_a + self.n_received_b + self.n_received_c == self.count:
            if self.n_received_a != 3 or self.n_received_b != 3 or self.n_received_c != 3:
                self.error = "Incorrect Distribution: %d/%d/%d" % (self.n_received_a, self.n_received_b, self.n_received_c)
            self.timer.cancel()
            self.conn.close()

    def timeout(self):
        self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
                     (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
        self.conn.close()

    def on_start(self, event):
        self.timer      = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn       = event.container.connect(self.address)
        self.sender     = event.container.create_sender(self.conn, self.dest)
        self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")
        self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
        self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")

    def send(self):
        if self.n_sent < self.count:
            msg = Message(body="Appearance-Test")
            self.sender.send(msg)
            self.n_sent += 1

    def on_sendable(self, event):
        if self.n_sent == 0:
            self.send()

    def on_message(self, event):
        if event.receiver == self.receiver_a:
            self.n_received_a += 1
        if event.receiver == self.receiver_b:
            self.n_received_b += 1
        if event.receiver == self.receiver_c:
            self.n_received_c += 1

    def on_accepted(self, event):
        self.send()
        self.check_if_done()

    def run(self):
        Container(self).run()


class BatchedSettlementTest(MessagingHandler):
    def __init__(self, address):
        super(BatchedSettlementTest, self).__init__(auto_accept=False)
        self.address = address
        self.dest = "balanced.BatchedSettlement"
        self.error = None
        self.count       = 200
        self.batch_count = 20
        self.n_sent      = 0
        self.n_received  = 0
        self.n_settled   = 0
        self.batch       = []
        self.accepted_count_match = False

    def check_if_done(self):
        if self.n_settled == self.count:
            local_node = Node.connect(self.address, timeout=TIMEOUT)
            outs = local_node.query(type='org.apache.qpid.dispatch.router')
            pos = outs.attribute_names.index("acceptedDeliveries")
            results = outs.results[0]
            if results[pos] >= self.count:
                self.accepted_count_match = True

            self.timer.cancel()
            self.conn.close()

    def timeout(self):
        self.error = "Timeout Expired: sent=%d rcvd=%d settled=%d" % \
                     (self.n_sent, self.n_received, self.n_settled)
        self.conn.close()

    def on_start(self, event):
        self.timer    = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn     = event.container.connect(self.address)
        self.sender   = event.container.create_sender(self.conn, self.dest)
        self.receiver = event.container.create_receiver(self.conn, self.dest)

    def send(self):
        while self.n_sent < self.count and self.sender.credit > 0:
            msg = Message(body="Batch-Test")
            self.sender.send(msg)
            self.n_sent += 1

    def on_sendable(self, event):
        self.send()

    def on_message(self, event):
        self.n_received += 1
        self.batch.insert(0, event.delivery)
        if len(self.batch) == self.batch_count:
            while len(self.batch) > 0:
                self.accept(self.batch.pop())

    def on_accepted(self, event):
        self.n_settled += 1
        self.check_if_done()

    def run(self):
        Container(self).run()


class RejectCoordinatorTest(MessagingHandler, TransactionHandler):
    def __init__(self, url):
        super(RejectCoordinatorTest, self).__init__(prefetch=0)
        self.url = Url(url)
        self.error = "The router can't coordinate transactions by itself, a linkRoute to a coordinator must be " \
                     "configured to use transactions."
        self.container = None
        self.conn = None
        self.sender = None
        self.timer = None
        self.passed = False
        self.link_error = False
        self.link_remote_close = False

    def timeout(self):
        self.conn.close()

    def check_if_done(self):
        if self.link_remote_close and self.link_error:
            self.passed = True
            self.conn.close()
            self.timer.cancel()

    def on_start(self, event):
        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.container = event.container
        self.conn = self.container.connect(self.url)
        self.sender = self.container.create_sender(self.conn, self.url.path)
        # declare_transaction tries to create a link with name "txn-ctrl" to the
        # transaction coordinator which has its own target, it has no address
        # The router cannot coordinate transactions itself and so there will be a link error when this
        # link is attempted to be created
        self.container.declare_transaction(self.conn, handler=self)

    def on_link_error(self, event):
        link = event.link
        # If the link name is 'txn-ctrl' and there is a link error and it matches self.error, then we know
        # that the router has rejected the link because it cannot coordinate transactions itself
        if link.name == "txn-ctrl" and link.remote_condition.description == self.error and \
                        link.remote_condition.name == 'amqp:precondition-failed':
            self.link_error = True
            self.check_if_done()

    def on_link_remote_close(self, event):
        link = event.link
        if link.name == "txn-ctrl":
            self.link_remote_close = True
            self.check_if_done()

    def run(self):
        Container(self).run()




class PresettledOverflowTest(MessagingHandler):
    def __init__(self, address):
        super(PresettledOverflowTest, self).__init__(prefetch=0)
        self.address = address
        self.dest = "balanced.PresettledOverflow"
        self.error = None
        self.count       = 500
        self.n_sent      = 0
        self.n_received  = 0
        self.last_seq    = -1

    def timeout(self):
        self.error = "Timeout Expired: sent=%d rcvd=%d last_seq=%d" % (self.n_sent, self.n_received, self.last_seq)
        self.conn.close()

    def on_start(self, event):
        self.timer    = event.reactor.schedule(TIMEOUT, Timeout(self))
        self.conn     = event.container.connect(self.address)
        self.sender   = event.container.create_sender(self.conn, self.dest)
        self.receiver = event.container.create_receiver(self.conn, self.dest)
        self.receiver.flow(10)

    def send(self):
        while self.n_sent < self.count and self.sender.credit > 0:
            msg = Message(body={"seq": self.n_sent})
            dlv = self.sender.send(msg)
            dlv.settle()
            self.n_sent += 1
        if self.n_sent == self.count:
            self.receiver.flow(self.count)

    def on_sendable(self, event):
        if self.n_sent < self.count:
            self.send()

    def on_message(self, event):
        self.n_received += 1
        self.last_seq = event.message.body["seq"]
        if self.last_seq == self.count - 1:
            if self.n_received == self.count:
                self.error = "No deliveries were dropped"

            if not self.error:
                local_node = Node.connect(self.address, timeout=TIMEOUT)
                out = local_node.query(type='org.apache.qpid.dispatch.router.link')

                for result in out.results:
                    if result[5] == 'out' and 'balanced.PresettledOverflow' in result[6]:
                        if result[16] != 249:
                            self.error = "Expected 249 dropped presettled deliveries but got " + str(result[16])
            self.conn.close()
            self.timer.cancel()

    def run(self):
        Container(self).run()


class RejectDispositionTest(MessagingHandler):
    def __init__(self, address):
        super(RejectDispositionTest, self).__init__(auto_accept=False)
        self.address = address
        self.sent = False
        self.received_error = False
        self.dest = "rejectDispositionTest"
        # explicitly convert to str due to
        # https://issues.apache.org/jira/browse/PROTON-1843
        self.error_description = str('you were out of luck this time!')
        self.error_name = u'amqp:internal-error'
        self.reject_count_match = False
        self.rejects_at_start = 0

    def count_rejects ( self ) :
        local_node = Node.connect(self.address, timeout=TIMEOUT)
        outs = local_node.query(type='org.apache.qpid.dispatch.router')
        pos = outs.attribute_names.index("rejectedDeliveries")
        results = outs.results[0]
        return results[pos]

    def on_start(self, event):
        conn = event.container.connect(self.address)
        event.container.create_sender(conn, self.dest)
        event.container.create_receiver(conn, self.dest)
        self.rejects_at_start = self.count_rejects ( )

    def on_sendable(self, event):
        if not self.sent:
            event.sender.send(Message(body=u"Hello World!"))
            self.sent = True

    def on_rejected(self, event):
        if event.delivery.remote.condition.description == self.error_description \
                and event.delivery.remote.condition.name == self.error_name:
            self.received_error = True
        rejects_now = self.count_rejects ( )
        rejects_for_this_test = rejects_now - self.rejects_at_start
        if rejects_for_this_test == 1:
            self.reject_count_match = True
        event.connection.close()

    def on_message(self, event):
        event.delivery.local.condition = Condition(self.error_name, self.error_description)
        self.reject(event.delivery)

    def run(self):
        Container(self).run()


class UnsettledLargeMessageTest(MessagingHandler):
    def __init__(self, addr, n_messages):
        super (UnsettledLargeMessageTest, self).__init__()
        self.addr = addr
        self.n_messages = n_messages
        self.sender = None
        self.receiver = None
        self.sender_conn = None
        self.recv_conn = None
        self.n_sent = 0
        self.n_received = 0
        self.error = None
        self.test_timer = None
        self.max_receive = 1
        self.custom_timer = None
        self.timer = None
        self.n_accepted = 0
        self.n_modified = 0
        self.n_released = 0
        self.str1 = "0123456789abcdef"
        self.msg_str = ""
        for i in range(16384):
            self.msg_str += self.str1

    def run (self):
        Container(self).run()

    def check_if_done(self):
        # self.n_accepted + self.n_modified + self.n_released will never
        # equal self.n_messages without the fix for DISPATCH-1197 because
        # the router will never pull the data from the proton buffers once
        # the router hits q2_holdoff
        if self.n_accepted + self.n_modified + \
                self.n_released == self.n_messages:
            self.timer.cancel()
            self.sender_conn.close()

    def timeout(self):
        self.error = "Timeout Expired: sent=%d accepted=%d " \
                     "released=%d modified=%d" % (self.n_messages,
                                                  self.n_accepted,
                                                  self.n_released,
                                                  self.n_modified)

    def on_start (self, event):
        self.sender_conn = event.container.connect(self.addr)
        self.recv_conn = event.container.connect(self.addr)
        self.receiver = event.container.create_receiver(self.recv_conn,
                                                        "test_42")
        self.sender = event.container.create_sender(self.sender_conn,
                                                    "test_42")
        self.timer = event.reactor.schedule(15, Timeout(self))

    def on_accepted(self, event):
        self.n_accepted += 1

    def on_released(self, event):
        if event.delivery.remote_state == Delivery.MODIFIED:
            self.n_modified += 1
        else:
            self.n_released += 1

        self.check_if_done()

    def on_sendable(self, event):
        while self.n_sent < self.n_messages:
            msg = Message(id=(self.n_sent + 1),
                          body={'sequence': (self.n_sent + 1),
                                'msg_str': self.msg_str})
            # Presettle the delivery.
            self.sender.send (msg)
            self.n_sent += 1

    def on_message(self, event):
        self.n_received += 1
        if self.n_received == self.max_receive:
            # Close the receiver connection after receiving just one message
            # This will cause the release of multi-frame deliveries.
            # Meanwhile the sender will keep sending but will run into
            # the q2_holodd situation and never recover.
            # The sender link will be stalled
            # This test will NEVER pass without the fix to DISPATCH-1197
            # Receiver bails after receiving max_receive messages.
            self.receiver.close()
            self.recv_conn.close()


class OneRouterUnavailableCoordinatorTest(TestCase):
    @classmethod
    def setUpClass(cls):
        super(OneRouterUnavailableCoordinatorTest, cls).setUpClass()
        name = "test-router"
        OneRouterTest.listen_port = cls.tester.get_port()
        config = Qdrouterd.Config([
            ('router', {'mode': 'standalone', 'id': 'QDR',  'defaultDistribution': 'unavailable'}),
            ('listener', {'port': cls.tester.get_port() }),
            ('address', {'prefix': 'closest', 'distribution': 'closest'}),
            ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
        ])
        cls.router = cls.tester.qdrouterd(name, config)
        cls.router.wait_ready()
        cls.address = cls.router.addresses[0]

    def test_46_coordinator_linkroute_unavailable_DISPATCH_1453(self):
        # The defaultDistribution on the router is unavailable. We try to connect a tx sender
        # to make sure a good detailed message saying "the link route to a coordinator must be
        # configured" is sent back.
        test = RejectCoordinatorGoodMessageTest(self.address)
        test.run()
        self.assertTrue(test.passed)

    def test_47_coordinator_linkroute_available_DISPATCH_1453(self):
        # The defaultDistribution on the router is unavailable. We create a link route with $coordinator address
        # The link route is not attached to any broker. When the attach comes in, the reject message must be
        # condition=:"qd:no-route-to-dest", description="No route to the destination node"
        COORDINATOR = "$coordinator"
        long_type = 'org.apache.qpid.dispatch.router.config.linkRoute'
        qd_manager = QdManager(self, address=self.address)
        args = {"prefix": COORDINATOR, "connection": "broker", "dir": "in"}
        qd_manager.create(long_type, args)
        link_route_created = False

        # Verify that the link route was created by querying for it.
        outs = qd_manager.query(long_type)[0]
        if outs:
            try:
                if outs['prefix'] == COORDINATOR:
                    link_route_created = True
            except:
                pass

        self.assertTrue(link_route_created)

        # We have verified that the link route has been created but there is no broker connections.
        # Now let's try to open a transaction. We should get a no route to destination error
        test = RejectCoordinatorGoodMessageTest(self.address, link_route_present=True)
        test.run()
        self.assertTrue(test.passed)


class RejectCoordinatorGoodMessageTest(RejectCoordinatorTest):
    def __init__(self, url, link_route_present=False):
        super(RejectCoordinatorGoodMessageTest, self).__init__(url)
        self.link_route_present = link_route_present
        self.error_with_link_route = "No route to the destination node"

    def on_link_error(self, event):
        link = event.link
        # If the link name is 'txn-ctrl' and there is a link error and it matches self.error, then we know
        # that the router has rejected the link because it cannot coordinate transactions itself
        if link.name == "txn-ctrl":
            if self.link_route_present:
                if link.remote_condition.description == self.error_with_link_route and link.remote_condition.name == 'qd:no-route-to-dest':
                    self.link_error = True
            else:
                if link.remote_condition.description == self.error and link.remote_condition.name == 'amqp:precondition-failed':
                    self.link_error = True

            self.check_if_done()

    def run(self):
        Container(self).run()


class Q2HoldoffDropTest(MessagingHandler):
    """
    Create 3 multicast receivers, two which grant 2 credits and one that grants
    only one.  Send enough data to force Q2 holdoff (since one rx is blocked)
    Close the stalled rx connection, verify the remaining receivers get the
    message (Q2 holdoff disabled)
    """
    def __init__(self, router):
        super(Q2HoldoffDropTest, self).__init__(prefetch=0,
                                                auto_accept=False,
                                                auto_settle=False)
        self.router = router
        self.rx_fast1_conn = None
        self.rx_fast1 = None
        self.rx_fast2_conn = None
        self.rx_fast2 = None
        self.rx_slow_conn = None
        self.rx_slow = None
        self.tx_conn = None
        self.tx = None
        self.timer = None
        self.reactor = None
        self.error = None
        self.n_attached = 0
        self.n_rx = 0
        self.n_tx = 0
        self.close_timer = 0

        # currently the router buffer size is 512 bytes and the Q2 holdoff
        # buffer chain high watermark is 256 buffers.  We need to send a
        # message that will be big enough to trigger Q2 holdoff
        self.big_msg = Message(body=["DISPATCH-1330" * (512 * 256 * 4)])

    def done(self):
        if self.timer:
            self.timer.cancel()
        if self.close_timer:
            self.close_timer.cancel()
        if self.tx_conn:
            self.tx_conn.close()
        if self.rx_fast1_conn:
            self.rx_fast1_conn.close()
        if self.rx_fast2_conn:
            self.rx_fast2_conn.close()
        if self.rx_slow_conn:
            self.rx_slow_conn.close()

    def timeout(self):
        self.error = "Timeout Expired"
        self.done()

    def on_start(self, event):
        self.reactor = event.reactor
        self.timer = self.reactor.schedule(TIMEOUT, Timeout(self))

        self.rx_slow_conn = event.container.connect(self.router.addresses[0])
        self.rx_fast1_conn = event.container.connect(self.router.addresses[0])
        self.rx_fast2_conn = event.container.connect(self.router.addresses[0])

        self.rx_slow = event.container.create_receiver(self.rx_slow_conn,
                                                       source="multicast.dispatch-1330",
                                                       name="rx_slow")
        self.rx_fast1 = event.container.create_receiver(self.rx_fast1_conn,
                                                        source="multicast.dispatch-1330",
                                                        name="rx_fast1")
        self.rx_fast2 = event.container.create_receiver(self.rx_fast2_conn,
                                                        source="multicast.dispatch-1330",
                                                        name="rx_fast2")

    def on_link_opened(self, event):
        if event.receiver:
            self.n_attached += 1
            if self.n_attached == 3:
                self.rx_fast1.flow(2)
                self.rx_fast2.flow(2)
                self.rx_slow.flow(1)   # stall on 2nd msg

                self.tx_conn = event.container.connect(self.router.addresses[0])
                self.tx = event.container.create_sender(self.tx_conn,
                                                        target="multicast.dispatch-1330",
                                                        name="tx")

    def on_sendable(self, event):
        if self.n_tx == 0:
            # wait until all subscribers present
            self.router.wait_address("multicast.dispatch-1330", subscribers=3)
            for i in range(2):
                dlv = self.tx.send(self.big_msg)
                dlv.settle()
                self.n_tx += 1

    def close_rx_slow(self, event):
        if self.rx_slow_conn:
            self.rx_slow_conn.close()
            self.rx_slow_conn = None
            self.rx_slow = None

    def on_message(self, event):
        self.n_rx += 1
        if self.n_rx == 3: # first will arrive, second is blocked

            class CloseTimer(Timeout):
                def on_timer_task(self, event):
                    self.parent.close_rx_slow(event)

            # 2 second wait for Q2 to fill up
            self.close_timer = self.reactor.schedule(2.0, CloseTimer(self))

        if self.n_rx == 5:
            # succesfully received on last two receivers
            self.done()

    def run(self):
        Container(self).run()

        # wait until the router has cleaned up the route table
        clean = False
        while not clean:
            clean = True
            atype = 'org.apache.qpid.dispatch.router.address'
            addrs = self.router.management.query(type=atype).get_dicts()
            if list(filter(lambda a: a['name'].find("dispatch-1330") != -1, addrs)):
                clean = False
                break
            if not clean:
                sleep(0.1)


class OneRouterTransactionalAttachTest(TestCase):
    """
    Verify that a transaction is properly forwarded through the router
    """

    class FakeTxnBroker(FakeBroker):
        """
        A FakeBroker that tracks Transaction declaration.
        Note well: Proton python does not provide the ability to set a delivery
        state to DECLARED (0x0033), so this broker cannot simulate a full
        transactional delivery.  At best we ensure that the router properly
        forwards the target capabilities and the declare message.
        """
        def __init__(self, url, container_id=None, **handler_kwargs):
            super(OneRouterTransactionalAttachTest.FakeTxnBroker,
                  self).__init__(url, container_id, **handler_kwargs)
            self.txn_link = None
            self.remote_caps = None
            self.declare_body = None

        def on_link_opening(self, event):
            if event.link.remote_target.type == Terminus.COORDINATOR:
                self.txn_link = event.link
                self.txn_link.source.copy(event.link.remote_source)
                self.txn_link.target.copy(event.link.remote_target)
                self.remote_caps = self.txn_link.remote_target.capabilities
                self.txn_link.flow(1)
            else:
                super(OneRouterTransactionalAttachTest.FakeTxnBroker,
                      self).on_link_opening(event)

        def on_message(self, event):
            if event.link == self.txn_link:
                self.declare_body = event.message.body
                event.delivery.update(Delivery.REJECTED)
                event.delivery.settle()
            else:
                super(OneRouterTransactionalAttachTest.FakeTxnBroker,
                      self).on_message(event)


    class TxSender(MessagingHandler, TransactionHandler):
        """
        Transactional publisher client.  The transaction will fail since the
        fake broker cannot declare the transaction properly
        """
        def __init__(self, url, messages=1):
            super(OneRouterTransactionalAttachTest.TxSender, self).__init__()
            self.url = Url(url)
            self.sent = 0
            self.declare_failed = False
            self.total = messages

        def on_start(self, event):
            self.container = event.container
            self.conn = self.container.connect(self.url)
            self.sender = self.container.create_sender(self.conn, self.url.path)
            self.container.declare_transaction(self.conn, handler=self)
            self.transaction = None

        def on_transaction_declared(self, event):
            self.transaction = event.transaction
            self.declare_failed = False
            self.send()

        def on_sendable(self, event):
            self.send()

        def send(self):
            if self.transaction and self.sender.credit > 0 and self.sent < self.total:
                seq = self.sent
                self.sent -= 1
                msg = Message(id=seq, body={'sequence':seq})
                self.transaction.send(self.sender, msg)
                self.transaction.commit()
                self.transaction = None

        def on_transaction_declare_failed(self, event):
            # expected to fail, since the FakeBroker cannot declare a transaction
            self.declare_failed = True
            self.conn.close()


    @classmethod
    def setUpClass(cls):
        super(OneRouterTransactionalAttachTest, cls).setUpClass()
        config = Qdrouterd.Config([
            ('router', {'mode': 'standalone', 'id': 'TxnRouter'}),
            ('listener', {'port': cls.tester.get_port() }),
            ('connector', {'port': cls.tester.get_port(),
                           'role': 'route-container'}),

            ('linkRoute', {'prefix': "$coordinator",
                           'containerId': "FakeBroker",
                           'direction': "in"}),

            ('linkRoute', {'prefix': 'closest/queue01',
                           'containerId': 'FakeBroker',
                           'direction': 'in'}),
            ('linkRoute', {'prefix': 'closest/queue01',
                           'containerId': 'FakeBroker',
                           'direction': 'out'}),

            ('address', {'prefix': 'closest', 'distribution': 'closest'}),
            ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
        ])
        cls.router = cls.tester.qdrouterd('TxnRouter', config, wait=False)
        cls.listener = cls.router.addresses[0]
        cls.connector = cls.router.connector_addresses[0]

        cls.broker = cls.FakeTxnBroker(url=cls.connector,
                                       prefetch=0,
                                       auto_accept=False,
                                       auto_settle=False)
        cls.router.wait_connectors()
        cls.router.wait_address("closest/queue01")

    def test_01_verify_attach(self):
        """
        Verify the transaction link attach is correctly forwarded to the broker
        """
        client = self.TxSender(url=self.listener)
        Container(client).run()
        self.assertTrue(client.declare_failed)
        self.assertTrue(self.broker.txn_link is not None)
        self.assertTrue(self.broker.declare_body is not None)
        self.assertEqual(symbol('amqp:declare:list'),
                         self.broker.declare_body.descriptor)
        if PROTON_VERSION >= (0, 30, 0):
            # prior to proton 0.30.0 capabilities were not provided
            # see PROTON-2138
            self.assertTrue(self.broker.remote_caps is not None)
            # capabilities should be a list with a txn-capability type
            # verify router has forwarded this correctly:
            rc = self.broker.remote_caps
            rc.rewind()
            count = 0
            while rc.next() == Data.SYMBOL:
                s = rc.get_symbol()
                self.assertTrue(s in [symbol('amqp:local-transactions'),
                                      symbol('amqp:distributed-transactions'),
                                      symbol('amqp:promotable-transactions'),
                                      symbol('amqp:multi-txns-per-ssn'),
                                      symbol('amqp:multi-ssns-per-txn')])
                count += 1
            self.assertTrue(count > 0)


if __name__ == '__main__':
    unittest.main(main_module())
