DISPATCH-1488: add test to verify transanction handling
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 77e8804..3810a8f 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -28,10 +28,15 @@
from proton.handlers import MessagingHandler, TransactionHandler
from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
from proton.utils import BlockingConnection, SyncRequestResponse
+from proton import VERSION as PROTON_VERSION
+from proton import Terminus
+from proton import Data
from qpid_dispatch.management.client import Node
import os, json
from subprocess import PIPE, STDOUT
from time import sleep
+from test_broker import FakeBroker
+
CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451}
CONNECTION_PROPERTIES_SYMBOL = dict()
@@ -3149,6 +3154,7 @@
self.receiver.close()
self.recv_conn.close()
+
class OneRouterUnavailableCoordinatorTest(TestCase):
@classmethod
def setUpClass(cls):
@@ -3355,5 +3361,154 @@
sleep(0.1)
+class OneRouterTransactionalAttachTest(TestCase):
+ """
+ Verify that a transaction is properly forwarded through the router
+ """
+
+ class FakeTxnBroker(FakeBroker):
+ """
+ A FakeBroker that tracks Transaction declaration.
+ Note well: Proton python does not provide the ability to set a delivery
+ state to DECLARED (0x0033), so this broker cannot simulate a full
+ transactional delivery. At best we ensure that the router properly
+ forwards the target capabilities and the declare message.
+ """
+ def __init__(self, url, container_id=None, **handler_kwargs):
+ super(OneRouterTransactionalAttachTest.FakeTxnBroker,
+ self).__init__(url, container_id, **handler_kwargs)
+ self.txn_link = None
+ self.remote_caps = None
+ self.declare_body = None
+
+ def on_link_opening(self, event):
+ if event.link.remote_target.type == Terminus.COORDINATOR:
+ self.txn_link = event.link
+ self.txn_link.source.copy(event.link.remote_source)
+ self.txn_link.target.copy(event.link.remote_target)
+ self.remote_caps = self.txn_link.remote_target.capabilities
+ self.txn_link.flow(1)
+ else:
+ super(OneRouterTransactionalAttachTest.FakeTxnBroker,
+ self).on_link_opening(event)
+
+ def on_message(self, event):
+ if event.link == self.txn_link:
+ self.declare_body = event.message.body
+ event.delivery.update(Delivery.REJECTED)
+ event.delivery.settle()
+ else:
+ super(OneRouterTransactionalAttachTest.FakeTxnBroker,
+ self).on_message(event)
+
+
+ class TxSender(MessagingHandler, TransactionHandler):
+ """
+ Transactional publisher client. The transaction will fail since the
+ fake broker cannot declare the transaction properly
+ """
+ def __init__(self, url, messages=1):
+ super(OneRouterTransactionalAttachTest.TxSender, self).__init__()
+ self.url = Url(url)
+ self.sent = 0
+ self.declare_failed = False
+ self.total = messages
+
+ def on_start(self, event):
+ self.container = event.container
+ self.conn = self.container.connect(self.url)
+ self.sender = self.container.create_sender(self.conn, self.url.path)
+ self.container.declare_transaction(self.conn, handler=self)
+ self.transaction = None
+
+ def on_transaction_declared(self, event):
+ self.transaction = event.transaction
+ self.declare_failed = False
+ self.send()
+
+ def on_sendable(self, event):
+ self.send()
+
+ def send(self):
+ if self.transaction and self.sender.credit > 0 and self.sent < self.total:
+ seq = self.sent
+ self.sent -= 1
+ msg = Message(id=seq, body={'sequence':seq})
+ self.transaction.send(self.sender, msg)
+ self.transaction.commit()
+ self.transaction = None
+
+ def on_transaction_declare_failed(self, event):
+ # expected to fail, since the FakeBroker cannot declare a transaction
+ self.declare_failed = True
+ self.conn.close()
+
+
+ @classmethod
+ def setUpClass(cls):
+ super(OneRouterTransactionalAttachTest, cls).setUpClass()
+ config = Qdrouterd.Config([
+ ('router', {'mode': 'standalone', 'id': 'TxnRouter'}),
+ ('listener', {'port': cls.tester.get_port() }),
+ ('connector', {'port': cls.tester.get_port(),
+ 'role': 'route-container'}),
+
+ ('linkRoute', {'prefix': "$coordinator",
+ 'containerId': "FakeBroker",
+ 'direction': "in"}),
+
+ ('linkRoute', {'prefix': 'closest/queue01',
+ 'containerId': 'FakeBroker',
+ 'direction': 'in'}),
+ ('linkRoute', {'prefix': 'closest/queue01',
+ 'containerId': 'FakeBroker',
+ 'direction': 'out'}),
+
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ])
+ cls.router = cls.tester.qdrouterd('TxnRouter', config, wait=False)
+ cls.listener = cls.router.addresses[0]
+ cls.connector = cls.router.connector_addresses[0]
+
+ cls.broker = cls.FakeTxnBroker(url=cls.connector,
+ prefetch=0,
+ auto_accept=False,
+ auto_settle=False)
+ cls.router.wait_connectors()
+ cls.router.wait_address("closest/queue01")
+
+ def test_01_verify_attach(self):
+ """
+ Verify the transaction link attach is correctly forwarded to the broker
+ """
+ client = self.TxSender(url=self.listener)
+ Container(client).run()
+ self.assertTrue(client.declare_failed)
+ self.assertTrue(self.broker.txn_link is not None)
+ self.assertTrue(self.broker.declare_body is not None)
+ self.assertEqual(symbol('amqp:declare:list'),
+ self.broker.declare_body.descriptor)
+ if PROTON_VERSION >= (0, 30, 0):
+ # prior to proton 0.30.0 capabilities were not provided
+ # see PROTON-2138
+ self.assertTrue(self.broker.remote_caps is not None)
+ # capabilities should be a list with a txn-capability type
+ # verify router has forwarded this correctly:
+ rc = self.broker.remote_caps
+ rc.rewind()
+ count = 0
+ while rc.next() == Data.SYMBOL:
+ s = rc.get_symbol()
+ self.assertTrue(s in [symbol('amqp:local-transactions'),
+ symbol('amqp:distributed-transactions'),
+ symbol('amqp:promotable-transactions'),
+ symbol('amqp:multi-txns-per-ssn'),
+ symbol('amqp:multi-ssns-per-txn')])
+ count += 1
+ self.assertTrue(count > 0)
+
+
if __name__ == '__main__':
unittest.main(main_module())
diff --git a/tests/test_broker.py b/tests/test_broker.py
index 3b99bac..2f101a7 100644
--- a/tests/test_broker.py
+++ b/tests/test_broker.py
@@ -85,8 +85,8 @@
except IndexError: # no more messages
return 0
- def __init__(self, url, container_id=None):
- super(FakeBroker, self).__init__()
+ def __init__(self, url, container_id=None, **handler_kwargs):
+ super(FakeBroker, self).__init__(**handler_kwargs)
self.url = url
self.queues = {}
self.acceptor = None