blob: a9619bcdb8939a37d05eae72439ac5bcde545936 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.datatypes import Message, RangedSet
from qpid.session import SessionException
from qpid.testlib import TestBase010
from qpid.compat import set
from struct import pack, unpack
from time import sleep
class DtxTests(TestBase010):
"""
Tests for the amqp dtx related classes.
Tests of the form test_simple_xxx test the basic transactional
behaviour. The approach here is to 'swap' a message from one queue
to another by consuming and re-publishing in the same
transaction. That transaction is then completed in different ways
and the appropriate result verified.
The other tests enforce more specific rules and behaviour on a
per-method or per-field basis.
"""
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 0
tx_counter = 0
def reset_channel(self):
self.session.close()
self.session = self.conn.session("dtx-session", 1)
def test_simple_commit(self):
"""
Test basic one-phase commit behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "commit")
#neither queue should have any messages accessible
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(0, "queue-b")
#commit
self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status)
#should close and reopen session to ensure no unacked messages are held
self.reset_channel()
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
self.assertMessageId("commit", "queue-b")
def test_simple_prepare_commit(self):
"""
Test basic two-phase commit behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "prepare-commit")
#prepare
self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
#neither queue should have any messages accessible
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(0, "queue-b")
#commit
self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status)
self.reset_channel()
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
self.assertMessageId("prepare-commit", "queue-b")
def test_simple_rollback(self):
"""
Test basic rollback behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "rollback")
#neither queue should have any messages accessible
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(0, "queue-b")
#rollback
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
self.reset_channel()
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
self.assertMessageId("rollback", "queue-a")
def test_simple_prepare_rollback(self):
"""
Test basic rollback behaviour after the transaction has been prepared.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "prepare-rollback")
#prepare
self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
#neither queue should have any messages accessible
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(0, "queue-b")
#rollback
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
self.reset_channel()
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
self.assertMessageId("prepare-rollback", "queue-a")
def test_select_required(self):
"""
check that an error is flagged if select is not issued before
start or end
"""
session = self.session
tx = self.xid("dummy")
try:
session.dtx_start(xid=tx)
#if we get here we have failed, but need to do some cleanup:
session.dtx_end(xid=tx)
session.dtx_rollback(xid=tx)
self.fail("Session not selected for use with dtx, expected exception!")
except SessionException, e:
self.assertEquals(503, e.args[0].error_code)
def test_start_already_known(self):
"""
Verify that an attempt to start an association with a
transaction that is already known is not allowed (unless the
join flag is set).
"""
#create two sessions on different connection & select them for use with dtx:
session1 = self.session
session1.dtx_select()
other = self.connect()
session2 = other.session("other", 0)
session2.dtx_select()
#create a xid
tx = self.xid("dummy")
#start work on one session under that xid:
session1.dtx_start(xid=tx)
#then start on the other without the join set
failed = False
try:
session2.dtx_start(xid=tx)
except SessionException, e:
failed = True
error = e
#cleanup:
if not failed:
session2.dtx_end(xid=tx)
other.close()
session1.dtx_end(xid=tx)
session1.dtx_rollback(xid=tx)
#verification:
if failed: self.assertEquals(530, error.args[0].error_code)
else: self.fail("Xid already known, expected exception!")
def test_forget_xid_on_completion(self):
"""
Verify that a xid is 'forgotten' - and can therefore be used
again - once it is completed.
"""
#do some transactional work & complete the transaction
self.test_simple_commit()
# session has been reset, so reselect for use with dtx
self.session.dtx_select()
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
self.session.dtx_start(xid=tx)
self.session.dtx_end(xid=tx)
self.session.dtx_rollback(xid=tx)
def test_start_join_and_resume(self):
"""
Ensure the correct error is signalled when both the join and
resume flags are set on starting an association between a
session and a transcation.
"""
session = self.session
session.dtx_select()
tx = self.xid("dummy")
try:
session.dtx_start(xid=tx, join=True, resume=True)
#failed, but need some cleanup:
session.dtx_end(xid=tx)
session.dtx_rollback(xid=tx)
self.fail("Join and resume both set, expected exception!")
except SessionException, e:
self.assertEquals(503, e.args[0].error_code)
def test_start_join(self):
"""
Verify 'join' behaviour, where a session is associated with a
transaction that is already associated with another session.
"""
guard = self.keepQueuesAlive(["one", "two"])
#create two sessions & select them for use with dtx:
session1 = self.session
session1.dtx_select()
session2 = self.conn.session("second", 2)
session2.dtx_select()
#setup
session1.queue_declare(queue="one", auto_delete=True)
session1.queue_declare(queue="two", auto_delete=True)
session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage"))
session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage"))
#create a xid
tx = self.xid("dummy")
#start work on one session under that xid:
session1.dtx_start(xid=tx)
#then start on the other with the join flag set
session2.dtx_start(xid=tx, join=True)
#do work through each session
self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two'
self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one'
#mark end on both sessions
session1.dtx_end(xid=tx)
session2.dtx_end(xid=tx)
#commit and check
session1.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
self.assertMessageId("b", "one")
def test_suspend_resume(self):
"""
Test suspension and resumption of an association
"""
session = self.session
session.dtx_select()
#setup
session.queue_declare(queue="one", exclusive=True, auto_delete=True)
session.queue_declare(queue="two", exclusive=True, auto_delete=True)
session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
tx = self.xid("dummy")
session.dtx_start(xid=tx)
self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
session.dtx_end(xid=tx, suspend=True)
session.dtx_start(xid=tx, resume=True)
self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
session.dtx_end(xid=tx)
#commit and check
session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
self.assertMessageId("b", "one")
def test_suspend_start_end_resume(self):
"""
Test suspension and resumption of an association with work
done on another transaction when the first transaction is
suspended
"""
session = self.session
session.dtx_select()
#setup
session.queue_declare(queue="one", exclusive=True, auto_delete=True)
session.queue_declare(queue="two", exclusive=True, auto_delete=True)
session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
tx = self.xid("dummy")
session.dtx_start(xid=tx)
self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
session.dtx_end(xid=tx, suspend=True)
session.dtx_start(xid=tx, resume=True)
self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
session.dtx_end(xid=tx)
#commit and check
session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
self.assertMessageId("b", "one")
def test_end_suspend_and_fail(self):
"""
Verify that the correct error is signalled if the suspend and
fail flag are both set when disassociating a transaction from
the session
"""
session = self.session
session.dtx_select()
tx = self.xid("suspend_and_fail")
session.dtx_start(xid=tx)
try:
session.dtx_end(xid=tx, suspend=True, fail=True)
self.fail("Suspend and fail both set, expected exception!")
except SessionException, e:
self.assertEquals(503, e.args[0].error_code)
#cleanup
other = self.connect()
session = other.session("cleanup", 1)
session.dtx_rollback(xid=tx)
session.close()
other.close()
def test_end_unknown_xid(self):
"""
Verifies that the correct exception is thrown when an attempt
is made to end the association for a xid not previously
associated with the session
"""
session = self.session
session.dtx_select()
tx = self.xid("unknown-xid")
try:
session.dtx_end(xid=tx)
self.fail("Attempted to end association with unknown xid, expected exception!")
except SessionException, e:
self.assertEquals(409, e.args[0].error_code)
def test_end(self):
"""
Verify that the association is terminated by end and subsequent
operations are non-transactional
"""
guard = self.keepQueuesAlive(["tx-queue"])
session = self.conn.session("alternate", 1)
#publish a message under a transaction
session.dtx_select()
tx = self.xid("dummy")
session.dtx_start(xid=tx)
session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage"))
session.dtx_end(xid=tx)
#now that association with txn is ended, publish another message
session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage"))
#check the second message is available, but not the first
self.assertMessageCount(1, "tx-queue")
self.subscribe(session, queue="tx-queue", destination="results")
msg = session.incoming("results").get(timeout=1)
self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id'))
session.message_cancel(destination="results")
#ack the message then close the session
session.message_accept(RangedSet(msg.id))
session.close()
session = self.session
#commit the transaction and check that the first message (and
#only the first message) is then delivered
session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "tx-queue")
self.assertMessageId("one", "tx-queue")
def test_invalid_commit_one_phase_true(self):
"""
Test that a commit with one_phase = True is rejected if the
transaction in question has already been prepared.
"""
other = self.connect()
tester = other.session("tester", 1)
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
tester.dtx_select()
tx = self.xid("dummy")
tester.dtx_start(xid=tx)
tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
tester.dtx_end(xid=tx)
tester.dtx_prepare(xid=tx)
failed = False
try:
tester.dtx_commit(xid=tx, one_phase=True)
except SessionException, e:
failed = True
error = e
if failed:
self.session.dtx_rollback(xid=tx)
self.assertEquals(409, error.args[0].error_code)
else:
tester.close()
other.close()
self.fail("Invalid use of one_phase=True, expected exception!")
def test_invalid_commit_one_phase_false(self):
"""
Test that a commit with one_phase = False is rejected if the
transaction in question has not yet been prepared.
"""
other = self.connect()
tester = other.session("tester", 1)
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
tester.dtx_select()
tx = self.xid("dummy")
tester.dtx_start(xid=tx)
tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
tester.dtx_end(xid=tx)
failed = False
try:
tester.dtx_commit(xid=tx, one_phase=False)
except SessionException, e:
failed = True
error = e
if failed:
self.session.dtx_rollback(xid=tx)
self.assertEquals(409, error.args[0].error_code)
else:
tester.close()
other.close()
self.fail("Invalid use of one_phase=False, expected exception!")
def test_invalid_commit_not_ended(self):
"""
Test that a commit fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
self.session.dtx_select()
tx = self.xid("dummy")
self.session.dtx_start(xid=tx)
self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
failed = False
try:
tester.dtx_commit(xid=tx, one_phase=False)
except SessionException, e:
failed = True
error = e
if failed:
self.session.dtx_end(xid=tx)
self.session.dtx_rollback(xid=tx)
self.assertEquals(409, error.args[0].error_code)
else:
tester.close()
other.close()
self.fail("Commit should fail as xid is still associated!")
def test_invalid_rollback_not_ended(self):
"""
Test that a rollback fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
self.session.dtx_select()
tx = self.xid("dummy")
self.session.dtx_start(xid=tx)
self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
failed = False
try:
tester.dtx_rollback(xid=tx)
except SessionException, e:
failed = True
error = e
if failed:
self.session.dtx_end(xid=tx)
self.session.dtx_rollback(xid=tx)
self.assertEquals(409, error.args[0].error_code)
else:
tester.close()
other.close()
self.fail("Rollback should fail as xid is still associated!")
def test_invalid_prepare_not_ended(self):
"""
Test that a prepare fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
self.session.dtx_select()
tx = self.xid("dummy")
self.session.dtx_start(xid=tx)
self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
failed = False
try:
tester.dtx_prepare(xid=tx)
except SessionException, e:
failed = True
error = e
if failed:
self.session.dtx_end(xid=tx)
self.session.dtx_rollback(xid=tx)
self.assertEquals(409, error.args[0].error_code)
else:
tester.close()
other.close()
self.fail("Rollback should fail as xid is still associated!")
def test_implicit_end(self):
"""
Test that an association is implicitly ended when the session
is closed (whether by exception or explicit client request)
and the transaction in question is marked as rollback only.
"""
session1 = self.session
session2 = self.conn.session("other", 2)
#setup:
session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever"))
tx = self.xid("dummy")
session2.dtx_select()
session2.dtx_start(xid=tx)
session2.message_subscribe(queue="dummy", destination="dummy")
session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1)
session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFFL)
msg = session2.incoming("dummy").get(timeout=1)
session2.message_accept(RangedSet(msg.id))
session2.message_cancel(destination="dummy")
session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever"))
session2.close()
self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status)
session1.dtx_rollback(xid=tx)
def test_get_timeout(self):
"""
Check that get-timeout returns the correct value, (and that a
transaction with a timeout can complete normally)
"""
session = self.session
tx = self.xid("dummy")
session.dtx_select()
session.dtx_start(xid=tx)
# below test checks for default value of dtx-default-timeout broker option
self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
session.dtx_set_timeout(xid=tx, timeout=200)
self.assertEqual(200, session.dtx_get_timeout(xid=tx).timeout)
self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
def test_set_timeout(self):
"""
Test the timeout of a transaction results in the expected
behaviour
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
#open new session to allow self.session to be used in checking the queue
session = self.conn.session("worker", 1)
#setup:
tx = self.xid("dummy")
session.queue_declare(queue="queue-a", auto_delete=True)
session.queue_declare(queue="queue-b", auto_delete=True)
session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage"))
session.dtx_select()
session.dtx_start(xid=tx)
self.swap(session, "queue-a", "queue-b")
session.dtx_set_timeout(xid=tx, timeout=2)
sleep(3)
#check that the work has been rolled back already
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
self.assertMessageId("timeout", "queue-a")
#check the correct codes are returned when we try to complete the txn
self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
def test_set_timeout_too_high(self):
"""
Test the timeout can't be more than --dtx-max-timeout
broker option
"""
session = self.session
tx = self.xid("dummy")
session.dtx_select()
session.dtx_start(xid=tx)
try:
session.dtx_set_timeout(xid=tx, timeout=3601)
except SessionException, e:
self.assertEquals(542, e.args[0].error_code)
def test_recover(self):
"""
Test basic recover behaviour
"""
session = self.session
session.dtx_select()
session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
prepared = []
for i in range(1, 10):
tx = self.xid("tx%s" % (i))
session.dtx_start(xid=tx)
session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i)))
session.dtx_end(xid=tx)
if i in [2, 5, 6, 8]:
session.dtx_prepare(xid=tx)
prepared.append(tx)
else:
session.dtx_rollback(xid=tx)
xids = session.dtx_recover().in_doubt
#rollback the prepared transactions returned by recover
for x in xids:
session.dtx_rollback(xid=x)
#validate against the expected list of prepared transactions
actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these
expected = set([x.global_id for x in prepared])
intersection = actual.intersection(expected)
if intersection != expected:
missing = expected.difference(actual)
extra = actual.difference(expected)
self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
def test_bad_resume(self):
"""
Test that a resume on a session not selected for use with dtx fails
"""
session = self.session
try:
session.dtx_start(resume=True)
except SessionException, e:
self.assertEquals(503, e.args[0].error_code)
def test_prepare_unknown(self):
session = self.session
try:
session.dtx_prepare(xid=self.xid("unknown"))
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_commit_unknown(self):
session = self.session
try:
session.dtx_commit(xid=self.xid("unknown"))
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_rollback_unknown(self):
session = self.session
try:
session.dtx_rollback(xid=self.xid("unknown"))
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_get_timeout_unknown(self):
session = self.session
try:
session.dtx_get_timeout(xid=self.xid("unknown"))
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def xid(self, txid):
DtxTests.tx_counter += 1
branchqual = "v%s" % DtxTests.tx_counter
return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
def txswap(self, tx, id):
session = self.session
#declare two queues:
session.queue_declare(queue="queue-a", auto_delete=True)
session.queue_declare(queue="queue-b", auto_delete=True)
#put message with specified id on one queue:
dp=session.delivery_properties(routing_key="queue-a")
mp=session.message_properties(correlation_id=id)
session.message_transfer(message=Message(dp, mp, "DtxMessage"))
#start the transaction:
session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
#'swap' the message from one queue to the other, under that transaction:
self.swap(self.session, "queue-a", "queue-b")
#mark the end of the transactional work:
self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
def swap(self, session, src, dest):
#consume from src:
session.message_subscribe(destination="temp-swap", queue=src)
session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1)
session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
msg = session.incoming("temp-swap").get(timeout=1)
session.message_cancel(destination="temp-swap")
session.message_accept(RangedSet(msg.id))
#todo: also complete at this point?
#re-publish to dest:
dp=session.delivery_properties(routing_key=dest)
mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id'))
session.message_transfer(message=Message(dp, mp, msg.body))
def assertMessageCount(self, expected, queue):
self.assertEqual(expected, self.session.queue_query(queue=queue).message_count)
def assertMessageId(self, expected, queue):
self.session.message_subscribe(queue=queue, destination="results")
self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1)
self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id'))
self.session.message_cancel(destination="results")
def getMessageProperty(self, msg, prop):
for h in msg.headers:
if hasattr(h, prop): return getattr(h, prop)
return None
def keepQueuesAlive(self, names):
session = self.conn.session("nasty", 99)
for n in names:
session.queue_declare(queue=n, auto_delete=True)
session.message_subscribe(destination=n, queue=n)
return session
def createMessage(self, session, key, id, body):
dp=session.delivery_properties(routing_key=key)
mp=session.message_properties(correlation_id=id)
session.message_transfer(message=Message(dp, mp, body))