blob: 941f973c6071316108b87eaf4539cc592eb82c76 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.test.unit.xa;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAQueueConnection;
import javax.jms.XAQueueConnectionFactory;
import javax.jms.XAQueueSession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QueueTest extends AbstractXATestCase
{
private static final Logger _logger = LoggerFactory.getLogger(QueueTest.class);
/**
* the queue use by all the tests
*/
private static Queue _queue = null;
/**
* the queue connection factory used by all tests
*/
private static XAQueueConnectionFactory _queueFactory = null;
/**
* standard xa queue connection
*/
private static XAQueueConnection _xaqueueConnection = null;
/**
* standard xa queue connection
*/
private static QueueConnection _queueConnection = null;
/**
* standard queue session created from the standard connection
*/
private static QueueSession _nonXASession = null;
/** ----------------------------------------------------------------------------------- **/
/**
* ----------------------------- JUnit support ----------------------------------------- *
*/
@Override
public void tearDown() throws Exception
{
try
{
if (_xaqueueConnection != null)
{
_xaqueueConnection.close();
}
if (_queueConnection != null)
{
_queueConnection.close();
}
}
finally
{
super.tearDown();
}
}
/**
* Initialize standard actors
*/
public void init() throws Exception
{
if (!isBroker08())
{
_queueFactory = (XAQueueConnectionFactory) getConnectionFactory();
_xaqueueConnection= getNewQueueXAConnection();
final XAQueueSession session = _xaqueueConnection.createXAQueueSession();
_queue = session.createQueue(getTestQueueName());
_queueConnection = _queueFactory.createQueueConnection(GUEST_USERNAME, GUEST_PASSWORD);
_nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
init(session, _queue);
}
}
/** -------------------------------------------------------------------------------------- **/
/** ----------------------------- Test Suite -------------------------------------------- **/
/** -------------------------------------------------------------------------------------- **/
/**
* Uses two transactions respectively with xid1 and xid2 that are used to send a message
* within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2.
* Xid is then committed and a standard transaction is used to receive the message that was sent within xid1.
*/
public void testProducer()
{
if (!isBroker08())
{
_logger.debug("running testProducer");
Xid xid1 = getNewXid();
Xid xid2 = getNewXid();
// start the xaResource for xid1
try
{
_xaResource.start(xid1, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
_logger.error("cannot start the transaction with xid1", e);
fail("cannot start the transaction with xid1: " + e.getMessage());
}
try
{
// start the connection
_xaqueueConnection.start();
// produce a message with sequence number 1
_message.setLongProperty(_sequenceNumberPropertyName, 1);
_producer.send(_message);
}
catch (JMSException e)
{
fail(" cannot send persistent message: " + e.getMessage());
}
// suspend the transaction
try
{
_xaResource.end(xid1, XAResource.TMSUSPEND);
}
catch (XAException e)
{
fail("Cannot end the transaction with xid1: " + e.getMessage());
}
// start the xaResource for xid2
try
{
_xaResource.start(xid2, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
fail("cannot start the transaction with xid2: " + e.getMessage());
}
try
{
// produce a message
_message.setLongProperty(_sequenceNumberPropertyName, 2);
_producer.send(_message);
}
catch (JMSException e)
{
fail(" cannot send second persistent message: " + e.getMessage());
}
// end xid2 and start xid1
try
{
_xaResource.end(xid2, XAResource.TMSUCCESS);
_xaResource.start(xid1, XAResource.TMRESUME);
}
catch (XAException e)
{
fail("Exception when ending and starting transactions: " + e.getMessage());
}
// two phases commit transaction with xid2
try
{
int resPrepare = _xaResource.prepare(xid2);
if (resPrepare != XAResource.XA_OK)
{
fail("prepare returned: " + resPrepare);
}
_xaResource.commit(xid2, false);
}
catch (XAException e)
{
fail("Exception thrown when preparing transaction with xid2: " + e.getMessage());
}
// receive a message from queue test we expect it to be the second one
try
{
TextMessage message = (TextMessage) _consumer.receive(1000);
if (message == null)
{
fail("did not receive second message as expected ");
}
else
{
if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
{
fail("receive wrong message its sequence number is: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
}
}
catch (JMSException e)
{
fail("Exception when receiving second message: " + e.getMessage());
}
// end and one phase commit the first transaction
try
{
_xaResource.end(xid1, XAResource.TMSUCCESS);
_xaResource.commit(xid1, true);
}
catch (XAException e)
{
fail("Exception thrown when commiting transaction with xid1");
}
// We should now be able to receive the first message
try
{
_xaqueueConnection.close();
Session nonXASession = _nonXASession;
MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
_queueConnection.start();
TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 == null)
{
fail("did not receive first message as expected ");
}
else
{
if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
{
fail("receive wrong message its sequence number is: " + message1
.getLongProperty(_sequenceNumberPropertyName));
}
}
// commit that transacted session
nonXASession.commit();
// the queue should be now empty
message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 != null)
{
fail("receive an unexpected message ");
}
}
catch (JMSException e)
{
fail("Exception thrown when emptying the queue: " + e.getMessage());
}
}
}
/**
* strategy: Produce a message within Tx1 and prepare tx1. crash the server then commit tx1 and consume the message
*/
public void testSendAndRecover()
{
if (!isBroker08())
{
_logger.debug("running testSendAndRecover");
Xid xid1 = getNewXid();
// start the xaResource for xid1
try
{
_xaResource.start(xid1, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
fail("cannot start the transaction with xid1: " + e.getMessage());
}
try
{
// start the connection
_xaqueueConnection.start();
// produce a message with sequence number 1
_message.setLongProperty(_sequenceNumberPropertyName, 1);
_producer.send(_message);
}
catch (JMSException e)
{
fail(" cannot send persistent message: " + e.getMessage());
}
// suspend the transaction
try
{
_xaResource.end(xid1, XAResource.TMSUCCESS);
}
catch (XAException e)
{
fail("Cannot end the transaction with xid1: " + e.getMessage());
}
// prepare the transaction with xid1
try
{
_xaResource.prepare(xid1);
}
catch (XAException e)
{
fail("Exception when preparing xid1: " + e.getMessage());
}
/////// stop the server now !!
try
{
_logger.debug("stopping broker");
restartDefaultBroker();
init();
}
catch (Exception e)
{
fail("Exception when stopping and restarting the server");
}
// get the list of in doubt transactions
try
{
Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
if (inDoubt == null)
{
fail("the array of in doubt transactions should not be null ");
}
// At that point we expect only two indoubt transactions:
if (inDoubt.length != 1)
{
fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
}
// commit them
for (Xid anInDoubt : inDoubt)
{
if (anInDoubt.equals(xid1))
{
_logger.info("commit xid1 ");
try
{
_xaResource.commit(anInDoubt, false);
}
catch (Exception e)
{
_logger.error("PB when aborted xid1", e);
}
}
else
{
fail("did not receive right xid ");
}
}
}
catch (XAException e)
{
_logger.error("exception thrown when recovering transactions", e);
fail("exception thrown when recovering transactions " + e.getMessage());
}
// the queue should contain the first message!
try
{
_xaqueueConnection.close();
Session nonXASession = _nonXASession;
MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
_queueConnection.start();
TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 == null)
{
fail("queue does not contain any message!");
}
if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
{
fail("Wrong message returned! Sequence number is " + message1
.getLongProperty(_sequenceNumberPropertyName));
}
nonXASession.commit();
}
catch (JMSException e)
{
fail("Exception thrown when testin that queue test is not empty: " + e.getMessage());
}
}
}
/**
* strategy: Produce a message within Tx1 and prepare tx1. Produce a standard message and consume
* it within tx2 and prepare tx2. Shutdown the server and get the list of in doubt transactions:
* we expect tx1 and tx2! Then, Tx1 is aborted and tx2 is committed so we expect the test's queue to be empty!
*/
public void testRecover()
{
if (!isBroker08())
{
_logger.debug("running testRecover");
Xid xid1 = getNewXid();
Xid xid2 = getNewXid();
// start the xaResource for xid1
try
{
_xaResource.start(xid1, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
fail("cannot start the transaction with xid1: " + e.getMessage());
}
try
{
// start the connection
_xaqueueConnection.start();
// produce a message with sequence number 1
_message.setLongProperty(_sequenceNumberPropertyName, 1);
_producer.send(_message);
}
catch (JMSException e)
{
fail(" cannot send persistent message: " + e.getMessage());
}
// suspend the transaction
try
{
_xaResource.end(xid1, XAResource.TMSUCCESS);
}
catch (XAException e)
{
fail("Cannot end the transaction with xid1: " + e.getMessage());
}
// prepare the transaction with xid1
try
{
_xaResource.prepare(xid1);
}
catch (XAException e)
{
fail("Exception when preparing xid1: " + e.getMessage());
}
// send a message using the standard session
try
{
Session nonXASession = _nonXASession;
MessageProducer nonXAProducer = nonXASession.createProducer(_queue);
TextMessage message2 = nonXASession.createTextMessage();
message2.setText("non XA ");
message2.setLongProperty(_sequenceNumberPropertyName, 2);
nonXAProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
nonXAProducer.send(message2);
// commit that transacted session
nonXASession.commit();
}
catch (Exception e)
{
fail("Exception thrown when emptying the queue: " + e.getMessage());
}
// start the xaResource for xid2
try
{
_xaResource.start(xid2, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
fail("cannot start the transaction with xid1: " + e.getMessage());
}
// receive a message from queue test we expect it to be the second one
try
{
TextMessage message = (TextMessage) _consumer.receive(1000);
if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2)
{
fail("did not receive second message as expected ");
}
}
catch (JMSException e)
{
fail("Exception when receiving second message: " + e.getMessage());
}
// suspend the transaction
try
{
_xaResource.end(xid2, XAResource.TMSUCCESS);
}
catch (XAException e)
{
fail("Cannot end the transaction with xid2: " + e.getMessage());
}
// prepare the transaction with xid1
try
{
_xaResource.prepare(xid2);
}
catch (XAException e)
{
fail("Exception when preparing xid2: " + e.getMessage());
}
/////// stop the server now !!
try
{
_logger.debug("stopping broker");
restartDefaultBroker();
init();
}
catch (Exception e)
{
fail("Exception when stopping and restarting the server");
}
// get the list of in doubt transactions
try
{
Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
if (inDoubt == null)
{
fail("the array of in doubt transactions should not be null ");
}
// At that point we expect only two indoubt transactions:
if (inDoubt.length != 2)
{
fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
}
// commit them
for (Xid anInDoubt : inDoubt)
{
if (anInDoubt.equals(xid1))
{
_logger.debug("rollback xid1 ");
try
{
_xaResource.rollback(anInDoubt);
}
catch (Exception e)
{
_logger.error("PB when aborted xid1", e);
}
}
else if (anInDoubt.equals(xid2))
{
_logger.debug("commit xid2 ");
try
{
_xaResource.commit(anInDoubt, false);
}
catch (Exception e)
{
_logger.error("PB when commiting xid2", e);
}
}
}
}
catch (XAException e)
{
_logger.error("exception thrown when recovering transactions", e);
fail("exception thrown when recovering transactions " + e.getMessage());
}
// the queue should be empty
try
{
_xaqueueConnection.close();
Session nonXASession = _nonXASession;
MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
_queueConnection.start();
TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 != null)
{
fail("The queue is not empty! " + message1.getLongProperty(_sequenceNumberPropertyName));
}
}
catch (JMSException e)
{
fail("Exception thrown when testin that queue test is empty: " + e.getMessage());
}
}
}
/** -------------------------------------------------------------------------------------- **/
/** ----------------------------- Utility methods --------------------------------------- **/
/** -------------------------------------------------------------------------------------- **/
/**
* get a new queue connection
*
* @return a new queue connection
* @throws JMSException If the JMS provider fails to create the queue connection
* due to some internal error or in case of authentication failure
*/
private XAQueueConnection getNewQueueXAConnection() throws JMSException
{
return _queueFactory.createXAQueueConnection("guest", "guest");
}
}