blob: 33a2149bd22c09488594aacdd8ea4791f78ec293 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.test.unit.xa;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.*;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.ClientProperties;
/**
*
*
*/
public class TopicTest extends AbstractXATestCase
{
/* this class logger */
private static final Logger _logger = LoggerFactory.getLogger(TopicTest.class);
/**
* the topic use by all the tests
*/
private static Topic _topic = null;
/**
* the topic connection factory used by all tests
*/
private static XATopicConnectionFactory _topicFactory = null;
/**
* standard topic connection
*/
private static XATopicConnection _topicConnection = null;
/**
* standard topic session created from the standard connection
*/
private static XATopicSession _session = null;
private static TopicSession _nonXASession = null;
/**
* the topic name
*/
private static final String TOPICNAME = "xaTopic";
/**
* Indicate that a listenere has failed
*/
private static boolean _failure = false;
/** -------------------------------------------------------------------------------------- **/
/** ----------------------------- JUnit support ----------------------------------------- **/
/** -------------------------------------------------------------------------------------- **/
@Override
public void tearDown() throws Exception
{
if (!isBroker08())
{
try
{
_topicConnection.stop();
_topicConnection.close();
}
catch (Exception e)
{
fail("Exception thrown when cleaning standard connection: " + e);
}
}
super.tearDown();
}
/**
* Initialize standard actors
*/
public void init() throws Exception
{
if (!isBroker08())
{
setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1");
// lookup test queue
_topicFactory = (XATopicConnectionFactory) getConnectionFactory();
_topicConnection = getNewTopicXAConnection();
_session = _topicConnection.createXATopicSession();
_topic = _session.createTopic(getTestQueueName());
_nonXASession = _topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
init(_session, _topic);
}
}
/** -------------------------------------------------------------------------------------- **/
/** ----------------------------- Test Suite -------------------------------------------- **/
/** -------------------------------------------------------------------------------------- **/
/**
* Uses two transactions respectively with xid1 and xid2 that are use to send a message
* within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2.
* Xid is then committed and a standard transaction is used to receive the message that was sent within xid1.
*/
public void testProducer()
{
if (!isBroker08())
{
_logger.debug("testProducer");
Xid xid1 = getNewXid();
Xid xid2 = getNewXid();
try
{
Session nonXASession = _nonXASession;
MessageConsumer nonXAConsumer = nonXASession.createConsumer(_topic);
_producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// start the xaResource for xid1
try
{
_logger.debug("starting tx branch xid1");
_xaResource.start(xid1, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
_logger.error("cannot start the transaction with xid1", e);
fail("cannot start the transaction with xid1: " + e.getMessage());
}
try
{
// start the connection
_topicConnection.start();
_logger.debug("produce a message with sequence number 1");
_message.setLongProperty(_sequenceNumberPropertyName, 1);
_producer.send(_message);
}
catch (JMSException e)
{
fail(" cannot send persistent message: " + e.getMessage());
}
_logger.debug("suspend the transaction branch xid1");
try
{
_xaResource.end(xid1, XAResource.TMSUSPEND);
}
catch (XAException e)
{
fail("Cannot end the transaction with xid1: " + e.getMessage());
}
_logger.debug("start the xaResource for xid2");
try
{
_xaResource.start(xid2, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
fail("cannot start the transaction with xid2: " + e.getMessage());
}
try
{
_logger.debug("produce a message");
_message.setLongProperty(_sequenceNumberPropertyName, 2);
_producer.send(_message);
}
catch (JMSException e)
{
fail(" cannot send second persistent message: " + e.getMessage());
}
_logger.debug("end xid2 and start xid1");
try
{
_xaResource.end(xid2, XAResource.TMSUCCESS);
_xaResource.start(xid1, XAResource.TMRESUME);
}
catch (XAException e)
{
fail("Exception when ending and starting transactions: " + e.getMessage());
}
_logger.debug("two phases commit transaction with xid2");
try
{
int resPrepare = _xaResource.prepare(xid2);
if (resPrepare != XAResource.XA_OK)
{
fail("prepare returned: " + resPrepare);
}
_xaResource.commit(xid2, false);
}
catch (XAException e)
{
fail("Exception thrown when preparing transaction with xid2: " + e.getMessage());
}
_logger.debug("receiving a message from topic test we expect it to be the second one");
try
{
TextMessage message = (TextMessage) _consumer.receive(1000);
if (message == null)
{
fail("did not receive second message as expected ");
}
else
{
if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
{
fail("receive wrong message its sequence number is: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
}
}
catch (JMSException e)
{
fail("Exception when receiving second message: " + e.getMessage());
}
_logger.debug("end and one phase commit the first transaction");
try
{
_xaResource.end(xid1, XAResource.TMSUCCESS);
_xaResource.commit(xid1, true);
}
catch (XAException e)
{
fail("Exception thrown when commiting transaction with xid1");
}
_logger.debug("We should now be able to receive the first and second message");
try
{
TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 == null)
{
fail("did not receive first message as expected ");
}
else
{
if (message1.getLongProperty(_sequenceNumberPropertyName) != 2)
{
fail("receive wrong message its sequence number is: " + message1
.getLongProperty(_sequenceNumberPropertyName));
}
}
message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 == null)
{
fail("did not receive first message as expected ");
}
else
{
if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
{
fail("receive wrong message its sequence number is: " + message1
.getLongProperty(_sequenceNumberPropertyName));
}
}
_logger.debug("commit transacted session");
nonXASession.commit();
_logger.debug("Test that the topic is now empty");
message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 != null)
{
fail("receive an unexpected message ");
}
}
catch (JMSException e)
{
fail("Exception thrown when emptying the queue: " + e.getMessage());
}
}
catch (JMSException e)
{
fail("cannot create standard consumer: " + e.getMessage());
}
}
}
/**
* strategy: Produce a message within Tx1 and commit tx1. consume this message within tx2 and abort tx2.
* Consume the same message within tx3 and commit it. Check that no more message is available.
*/
public void testDurSub()
{
if (!isBroker08())
{
Xid xid1 = getNewXid();
Xid xid2 = getNewXid();
Xid xid3 = getNewXid();
Xid xid4 = getNewXid();
String durSubName = "xaSubDurable";
try
{
TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
try
{
_topicConnection.start();
_logger.debug("start xid1");
_xaResource.start(xid1, XAResource.TMNOFLAGS);
// start the connection
_topicConnection.start();
_logger.debug("produce a message with sequence number 1");
_message.setLongProperty(_sequenceNumberPropertyName, 1);
_producer.send(_message);
_logger.debug("2 phases commit xid1");
_xaResource.end(xid1, XAResource.TMSUCCESS);
if (_xaResource.prepare(xid1) != XAResource.XA_OK)
{
fail("Problem when preparing tx1 ");
}
_xaResource.commit(xid1, false);
}
catch (Exception e)
{
_logger.error("Exception when working with xid1", e);
fail("Exception when working with xid1: " + e.getMessage());
}
try
{
_logger.debug("start xid2");
_xaResource.start(xid2, XAResource.TMNOFLAGS);
_logger.debug("receive the previously produced message");
TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
_logger.debug("rollback xid2");
boolean rollbackOnFailure = false;
try
{
_xaResource.end(xid2, XAResource.TMFAIL);
}
catch (XAException e)
{
if (e.errorCode != XAException.XA_RBROLLBACK)
{
fail("Exception when working with xid2: " + e.getMessage());
}
rollbackOnFailure = true;
}
if (!rollbackOnFailure)
{
if (_xaResource.prepare(xid2) != XAResource.XA_OK)
{
fail("Problem when preparing tx2 ");
}
_xaResource.rollback(xid2);
}
}
catch (Exception e)
{
_logger.error("Exception when working with xid2", e);
fail("Exception when working with xid2: " + e.getMessage());
}
try
{
_logger.debug("start xid3");
_xaResource.start(xid3, XAResource.TMNOFLAGS);
_logger.debug(" receive the previously aborted consumed message");
TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
_logger.debug("commit xid3");
_xaResource.end(xid3, XAResource.TMSUCCESS);
if (_xaResource.prepare(xid3) != XAResource.XA_OK)
{
fail("Problem when preparing tx3 ");
}
_xaResource.commit(xid3, false);
}
catch (Exception e)
{
_logger.error("Exception when working with xid3", e);
fail("Exception when working with xid3: " + e.getMessage());
}
try
{
_logger.debug("start xid4");
_xaResource.start(xid4, XAResource.TMNOFLAGS);
_logger.debug("check that topic is empty");
TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message != null)
{
fail("An unexpected message was received ");
}
_logger.debug("commit xid4");
_xaResource.end(xid4, XAResource.TMSUCCESS);
_xaResource.commit(xid4, true);
}
catch (Exception e)
{
_logger.error("Exception when working with xid4", e);
fail("Exception when working with xid4: " + e.getMessage());
}
}
catch (Exception e)
{
_logger.error("problem when creating dur sub", e);
fail("problem when creating dur sub: " + e.getMessage());
}
finally
{
try
{
_session.unsubscribe(durSubName);
}
catch (JMSException e)
{
_logger.error("problem when unsubscribing dur sub", e);
fail("problem when unsubscribing dur sub: " + e.getMessage());
}
}
}
}
/**
* strategy: create a XA durable subscriber dusSub, produce 7 messages with the standard session,
* consume 2 messages respectively with tx1, tx2 and tx3
* abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7!
* commit tx3
* abort tx1: we now expect that only messages 5 and 6 are definitly consumed!
* start tx4 and consume messages 1 - 4 and 7
* commit tx4
* Now the topic should be empty!
*/
public void testMultiMessagesDurSub()
{
if (!isBroker08())
{
Xid xid1 = getNewXid();
Xid xid2 = getNewXid();
Xid xid3 = getNewXid();
Xid xid4 = getNewXid();
Xid xid6 = getNewXid();
String durSubName = "xaSubDurable";
TextMessage message;
try
{
TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
try
{
Session txSession = _nonXASession;
MessageProducer txProducer = txSession.createProducer(_topic);
_logger.debug("produce 10 persistent messages");
txProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
_topicConnection.start();
for (int i = 1; i <= 7; i++)
{
_message.setLongProperty(_sequenceNumberPropertyName, i);
txProducer.send(_message);
}
// commit txSession
txSession.commit();
}
catch (JMSException e)
{
_logger.error("Exception thrown when producing messages", e);
fail("Exception thrown when producing messages: " + e.getMessage());
}
try
{
_logger.debug(" consume 2 messages respectively with tx1, tx2 and tx3");
//----- start xid1
_xaResource.start(xid1, XAResource.TMNOFLAGS);
// receive the 2 first messages
for (int i = 1; i <= 2; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
_xaResource.end(xid1, XAResource.TMSUSPEND);
//----- start xid2
_xaResource.start(xid2, XAResource.TMNOFLAGS);
// receive the 2 first messages
for (int i = 3; i <= 4; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
_xaResource.end(xid2, XAResource.TMSUSPEND);
//----- start xid3
_xaResource.start(xid3, XAResource.TMNOFLAGS);
// receive the 2 first messages
for (int i = 5; i <= 6; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
_xaResource.end(xid3, XAResource.TMSUCCESS);
}
catch (Exception e)
{
_logger.error("Exception thrown when consumming 6 first messages", e);
fail("Exception thrown when consumming 6 first messages: " + e.getMessage());
}
try
{
_logger.debug("abort tx2, we now expect to receive messages 3, 4 and 7");
_xaResource.start(xid2, XAResource.TMRESUME);
_xaResource.end(xid2, XAResource.TMSUCCESS);
_xaResource.prepare(xid2);
_xaResource.rollback(xid2);
// receive 3 message within tx1: 3, 4 and 7
_xaResource.start(xid1, XAResource.TMRESUME);
_logger.debug(" 3, 4 and 7");
for (int i = 1; i <= 3; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + 3);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) <= 2 || 5 == message
.getLongProperty(_sequenceNumberPropertyName) || message
.getLongProperty(_sequenceNumberPropertyName) == 6)
{
fail("wrong sequence number: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
}
}
catch (Exception e)
{
_logger.error("Exception thrown when consumming message: 3, 4 and 7", e);
fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage());
}
try
{
_xaResource.end(xid1, XAResource.TMSUCCESS);
_logger.debug(" commit tx3");
_xaResource.commit(xid3, true);
_logger.debug("abort tx1");
_xaResource.prepare(xid1);
_xaResource.rollback(xid1);
}
catch (XAException e)
{
_logger.error("XAException thrown when committing tx3 or aborting tx1", e);
fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage());
}
try
{
// consume messages 1 - 4 + 7
//----- start xid1
_xaResource.start(xid4, XAResource.TMNOFLAGS);
for (int i = 1; i <= 5; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if(message != null)
{
_logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName));
}
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) == 5 || message
.getLongProperty(_sequenceNumberPropertyName) == 6)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
_xaResource.end(xid4, XAResource.TMSUCCESS);
_xaResource.prepare(xid4);
_xaResource.commit(xid4, false);
}
catch (Exception e)
{
_logger.error("Exception thrown in last phase", e);
fail("Exception thrown in last phase: " + e.getMessage());
}
// now the topic should be empty!!
try
{
// start xid6
_xaResource.start(xid6, XAResource.TMNOFLAGS);
// should now be empty
message = (TextMessage) xaDurSub.receive(1000);
if (message != null)
{
fail("An unexpected message was received " + message
.getLongProperty(_sequenceNumberPropertyName));
}
// commit xid6
_xaResource.end(xid6, XAResource.TMSUCCESS);
_xaResource.commit(xid6, true);
}
catch (Exception e)
{
_logger.error("Exception when working with xid6", e);
fail("Exception when working with xid6: " + e.getMessage());
}
}
catch (Exception e)
{
_logger.error("problem when creating dur sub", e);
fail("problem when creating dur sub: " + e.getMessage());
}
finally
{
try
{
_session.unsubscribe(durSubName);
}
catch (JMSException e)
{
_logger.error("problem when unsubscribing dur sub", e);
fail("problem when unsubscribing dur sub: " + e.getMessage());
}
}
}
}
/**
* strategy: create a XA durable subscriber dusSub, produce 10 messages with the standard session,
* consume 2 messages respectively with tx1, tx2 and tx3
* prepare xid2 and xid3
* crash the server
* Redo the job for xid1 that has been aborted by server crash
* abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7!
* commit tx3
* abort tx1: we now expect that only messages 5 and 6 are definitly consumed!
* start tx4 and consume messages 1 - 4
* start tx5 and consume messages 7 - 10
* abort tx4
* consume messages 1-4 with tx5
* commit tx5
* Now the topic should be empty!
*/
public void testMultiMessagesDurSubCrash()
{
if (!isBroker08())
{
Xid xid1 = getNewXid();
Xid xid2 = getNewXid();
Xid xid3 = getNewXid();
Xid xid4 = getNewXid();
Xid xid5 = getNewXid();
Xid xid6 = getNewXid();
String durSubName = "xaSubDurable";
TextMessage message;
try
{
TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
try
{
Session txSession = _nonXASession;
MessageProducer txProducer = txSession.createProducer(_topic);
// produce 10 persistent messages
txProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
_topicConnection.start();
for (int i = 1; i <= 10; i++)
{
_message.setLongProperty(_sequenceNumberPropertyName, i);
txProducer.send(_message);
}
// commit txSession
txSession.commit();
}
catch (JMSException e)
{
_logger.error("Exception thrown when producing messages", e);
fail("Exception thrown when producing messages: " + e.getMessage());
}
try
{
// consume 2 messages respectively with tx1, tx2 and tx3
//----- start xid1
_xaResource.start(xid1, XAResource.TMNOFLAGS);
// receive the 2 first messages
for (int i = 1; i <= 2; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
_xaResource.end(xid1, XAResource.TMSUCCESS);
//----- start xid2
_xaResource.start(xid2, XAResource.TMNOFLAGS);
// receive the 2 first messages
for (int i = 3; i <= 4; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
_xaResource.end(xid2, XAResource.TMSUCCESS);
//----- start xid3
_xaResource.start(xid3, XAResource.TMNOFLAGS);
// receive the 2 first messages
for (int i = 5; i <= 6; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
_xaResource.end(xid3, XAResource.TMSUCCESS);
// prepare tx2 and tx3
_xaResource.prepare(xid2);
_xaResource.prepare(xid3);
}
catch (Exception e)
{
_logger.error("Exception thrown when consumming 6 first messages", e);
fail("Exception thrown when consumming 6 first messages: " + e.getMessage());
}
/////// stop the broker now !!
try
{
restartDefaultBroker();
init();
}
catch (Exception e)
{
fail("Exception when stopping and restarting the server");
}
// get the list of in doubt transactions
try
{
_topicConnection.start();
// reconnect to dursub!
xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
if (inDoubt == null)
{
fail("the array of in doubt transactions should not be null ");
}
// At that point we expect only two indoubt transactions:
if (inDoubt.length != 2)
{
fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions");
}
}
catch (XAException e)
{
_logger.error("exception thrown when recovering transactions", e);
fail("exception thrown when recovering transactions " + e.getMessage());
}
try
{
// xid1 has been aborted redo the job!
// consume 2 messages with tx1
//----- start xid1
_xaResource.start(xid1, XAResource.TMNOFLAGS);
// receive the 2 first messages
for (int i = 1; i <= 2; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
_xaResource.end(xid1, XAResource.TMSUSPEND);
// abort tx2, we now expect to receive messages 3 and 4 first!
_xaResource.rollback(xid2);
// receive 3 message within tx1: 3, 4 and 7
_xaResource.start(xid1, XAResource.TMRESUME);
// receive messages 3, 4 and 7
Set<Long> expected = new HashSet<Long>();
expected.add(3L);
expected.add(4L);
expected.add(7L);
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected one of: " + expected);
}
else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
fail("wrong sequence number: " + message
.getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected);
}
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected one of: " + expected);
}
else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
fail("wrong sequence number: " + message
.getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected);
}
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected one of: " + expected);
}
else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
fail("wrong sequence number: " + message
.getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected);
}
}
catch (Exception e)
{
_logger.error("Exception thrown when consumming message: 3, 4 and 7", e);
fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage());
}
try
{
_xaResource.end(xid1, XAResource.TMSUSPEND);
// commit tx3
_xaResource.commit(xid3, false);
// abort tx1
_xaResource.prepare(xid1);
_xaResource.rollback(xid1);
}
catch (XAException e)
{
_logger.error("XAException thrown when committing tx3 or aborting tx1", e);
fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage());
}
try
{
// consume messages: could be any from (1 - 4, 7-10)
//----- start xid4
Set<Long> expected = new HashSet<Long>();
Set<Long> xid4msgs = new HashSet<Long>();
for(long l = 1; l <= 4l; l++)
{
expected.add(l);
}
for(long l = 7; l <= 10l; l++)
{
expected.add(l);
}
_xaResource.start(xid4, XAResource.TMNOFLAGS);
for (int i = 1; i <= 4; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
long seqNo = message.getLongProperty(_sequenceNumberPropertyName);
xid4msgs.add(seqNo);
if (!expected.remove(seqNo))
{
fail("wrong sequence number: " + seqNo +
" expected one from " + expected);
}
}
_xaResource.end(xid4, XAResource.TMSUSPEND);
// consume messages 8 - 10
_xaResource.start(xid5, XAResource.TMNOFLAGS);
for (int i = 7; i <= 10; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)
+ " expected one from " + expected);
}
}
_xaResource.end(xid5, XAResource.TMSUSPEND);
// abort tx4
_xaResource.prepare(xid4);
_xaResource.rollback(xid4);
expected.addAll(xid4msgs);
// consume messages 1-4 with tx5
_xaResource.start(xid5, XAResource.TMRESUME);
for (int i = 1; i <= 4; i++)
{
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
}
else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName)))
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)
+ " expected one from " + expected);
}
}
_xaResource.end(xid5, XAResource.TMSUSPEND);
// commit tx5
_xaResource.prepare(xid5);
_xaResource.commit(xid5, false);
}
catch (Exception e)
{
_logger.error("Exception thrown in last phase", e);
fail("Exception thrown in last phase: " + e.getMessage());
}
// now the topic should be empty!!
try
{
// start xid6
_xaResource.start(xid6, XAResource.TMNOFLAGS);
// should now be empty
message = (TextMessage) xaDurSub.receive(1000);
if (message != null)
{
fail("An unexpected message was received " + message
.getLongProperty(_sequenceNumberPropertyName));
}
// commit xid6
_xaResource.end(xid6, XAResource.TMSUSPEND);
_xaResource.commit(xid6, true);
}
catch (Exception e)
{
_logger.error("Exception when working with xid6", e);
fail("Exception when working with xid6: " + e.getMessage());
}
}
catch (Exception e)
{
_logger.error("problem when creating dur sub", e);
fail("problem when creating dur sub: " + e.getMessage());
}
finally
{
try
{
_session.unsubscribe(durSubName);
}
catch (JMSException e)
{
_logger.error("problem when unsubscribing dur sub", e);
fail("problem when unsubscribing dur sub: " + e.getMessage());
}
}
}
}
/**
* strategy: Produce a message within Tx1 and commit tx1. a durable subscriber then receives that message within tx2
* that is then prepared.
* Shutdown the server and get the list of in doubt transactions:
* we expect tx2, Tx2 is aborted and the message consumed within tx3 that is committed we then check that the topic is empty.
*/
public void testDurSubCrash()
{
if (!isBroker08())
{
Xid xid1 = getNewXid();
Xid xid2 = getNewXid();
Xid xid3 = getNewXid();
Xid xid4 = getNewXid();
String durSubName = "xaSubDurable";
try
{
TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
try
{
_topicConnection.start();
//----- start xid1
_xaResource.start(xid1, XAResource.TMNOFLAGS);
// start the connection
_topicConnection.start();
// produce a message with sequence number 1
_message.setLongProperty(_sequenceNumberPropertyName, 1);
_producer.send(_message);
// commit
_xaResource.end(xid1, XAResource.TMSUCCESS);
if (_xaResource.prepare(xid1) != XAResource.XA_OK)
{
fail("Problem when preparing tx1 ");
}
_xaResource.commit(xid1, false);
}
catch (Exception e)
{
_logger.error("Exception when working with xid1", e);
fail("Exception when working with xid1: " + e.getMessage());
}
try
{
// start xid2
_xaResource.start(xid2, XAResource.TMNOFLAGS);
// receive the previously produced message
TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
// prepare xid2
_xaResource.end(xid2, XAResource.TMSUCCESS);
if (_xaResource.prepare(xid2) != XAResource.XA_OK)
{
fail("Problem when preparing tx2 ");
}
}
catch (Exception e)
{
_logger.error("Exception when working with xid2", e);
fail("Exception when working with xid2: " + e.getMessage());
}
/////// stop the server now !!
try
{
restartDefaultBroker();
init();
}
catch (Exception e)
{
fail("Exception when stopping and restarting the server");
}
// get the list of in doubt transactions
try
{
_topicConnection.start();
// reconnect to dursub!
xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
if (inDoubt == null)
{
fail("the array of in doubt transactions should not be null ");
}
// At that point we expect only two indoubt transactions:
if (inDoubt.length != 1)
{
fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions");
}
// commit them
for (Xid anInDoubt : inDoubt)
{
if (anInDoubt.equals(xid2))
{
_logger.info("aborting xid2 ");
try
{
_xaResource.rollback(anInDoubt);
}
catch (Exception e)
{
_logger.error("exception when aborting xid2 ", e);
fail("exception when aborting xid2 ");
}
}
else
{
_logger.info("XID2 is not in doubt ");
}
}
}
catch (XAException e)
{
_logger.error("exception thrown when recovering transactions", e);
fail("exception thrown when recovering transactions " + e.getMessage());
}
try
{
// start xid3
_xaResource.start(xid3, XAResource.TMNOFLAGS);
// receive the previously produced message and aborted
TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
// commit xid3
_xaResource.end(xid3, XAResource.TMSUCCESS);
if (_xaResource.prepare(xid3) != XAResource.XA_OK)
{
fail("Problem when preparing tx3 ");
}
_xaResource.commit(xid3, false);
}
catch (Exception e)
{
_logger.error("Exception when working with xid3", e);
fail("Exception when working with xid3: " + e.getMessage());
}
try
{
// start xid4
_xaResource.start(xid4, XAResource.TMNOFLAGS);
// should now be empty
TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message != null)
{
fail("An unexpected message was received " + message
.getLongProperty(_sequenceNumberPropertyName));
}
// commit xid4
_xaResource.end(xid4, XAResource.TMSUCCESS);
_xaResource.commit(xid4, true);
}
catch (Exception e)
{
_logger.error("Exception when working with xid4", e);
fail("Exception when working with xid4: " + e.getMessage());
}
}
catch (Exception e)
{
_logger.error("problem when creating dur sub", e);
fail("problem when creating dur sub: " + e.getMessage());
}
finally
{
try
{
_session.unsubscribe(durSubName);
}
catch (JMSException e)
{
_logger.error("problem when unsubscribing dur sub", e);
fail("problem when unsubscribing dur sub: " + e.getMessage());
}
}
}
}
/**
* strategy: Produce a message within Tx1 and prepare tx1. Shutdown the server and get the list of indoubt transactions:
* we expect tx1, Tx1 is committed so we expect the test topic not to be empty!
*/
public void testRecover()
{
if (!isBroker08())
{
Xid xid1 = getNewXid();
String durSubName = "test1";
try
{
// create a dummy durable subscriber to be sure that messages are persisted!
_nonXASession.createDurableSubscriber(_topic, durSubName);
// start the xaResource for xid1
try
{
_xaResource.start(xid1, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
fail("cannot start the transaction with xid1: " + e.getMessage());
}
try
{
// start the connection
_topicConnection.start();
// produce a message with sequence number 1
_message.setLongProperty(_sequenceNumberPropertyName, 1);
_producer.send(_message);
}
catch (JMSException e)
{
fail(" cannot send persistent message: " + e.getMessage());
}
// suspend the transaction
try
{
_xaResource.end(xid1, XAResource.TMSUCCESS);
}
catch (XAException e)
{
fail("Cannot end the transaction with xid1: " + e.getMessage());
}
// prepare the transaction with xid1
try
{
_xaResource.prepare(xid1);
}
catch (XAException e)
{
fail("Exception when preparing xid1: " + e.getMessage());
}
/////// stop the server now !!
try
{
restartDefaultBroker();
init();
}
catch (Exception e)
{
fail("Exception when stopping and restarting the server");
}
try
{
MessageConsumer nonXAConsumer = _nonXASession.createDurableSubscriber(_topic, durSubName);
_topicConnection.start();
// get the list of in doubt transactions
try
{
Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
if (inDoubt == null)
{
fail("the array of in doubt transactions should not be null ");
}
// At that point we expect only two indoubt transactions:
if (inDoubt.length != 1)
{
fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
}
// commit them
for (Xid anInDoubt : inDoubt)
{
if (anInDoubt.equals(xid1))
{
_logger.debug("committing xid1 ");
try
{
_xaResource.commit(anInDoubt, false);
}
catch (Exception e)
{
_logger.error("PB when aborted xid1");
fail("exception when committing xid1 ");
}
}
else
{
_logger.debug("XID1 is not in doubt ");
}
}
}
catch (XAException e)
{
_logger.error("exception thrown when recovering transactions ", e);
fail("exception thrown when recovering transactions " + e.getMessage());
}
_logger.debug("the topic should not be empty");
TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 == null)
{
fail("The topic is empty! ");
}
}
catch (Exception e)
{
_logger.error("Exception thrown when testin that queue test is empty", e);
fail("Exception thrown when testin that queue test is empty: " + e.getMessage());
}
}
catch (JMSException e)
{
_logger.error("cannot create dummy durable subscriber", e);
fail("cannot create dummy durable subscriber: " + e.getMessage());
}
finally
{
try
{
// unsubscribe the dummy durable subscriber
TopicSession nonXASession = _nonXASession;
nonXASession.unsubscribe(durSubName);
}
catch (JMSException e)
{
fail("cannot unsubscribe durable subscriber: " + e.getMessage());
}
}
}
}
/**
* strategy:
* create a standard durable subscriber
* produce 3 messages
* consume the first message with that durable subscriber
* close the standard session that deactivates the durable subscriber
* migrate the durable subscriber to an xa one
* consume the second message with that xa durable subscriber
* close the xa session that deactivates the durable subscriber
* reconnect to the durable subscriber with a standard session
* consume the two remaining messages and check that the topic is empty!
*/
public void testMigrateDurableSubscriber()
{
if (!isBroker08())
{
Xid xid1 = getNewXid();
Xid xid2 = getNewXid();
String durSubName = "DurableSubscriberMigrate";
try
{
Session stSession = _nonXASession;
MessageProducer producer = stSession.createProducer(_topic);
_logger.debug("Create a standard durable subscriber!");
TopicSubscriber durSub = stSession.createDurableSubscriber(_topic, durSubName);
TopicSubscriber durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second");
TextMessage message;
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
_topicConnection.start();
_logger.debug("produce 3 messages");
for (int i = 1; i <= 3; i++)
{
_message.setLongProperty(_sequenceNumberPropertyName, i);
//producer.send( _message );
producer.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0);
stSession.commit();
}
_logger.debug("consume the first message with that durable subscriber");
message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
{
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
// commit the standard session
stSession.commit();
_logger.debug("first message consumed ");
// close the session that deactivates the durable subscriber
stSession.close();
_logger.debug("migrate the durable subscriber to an xa one");
_xaResource.start(xid1, XAResource.TMNOFLAGS);
durSub = _session.createDurableSubscriber(_topic, durSubName);
_logger.debug(" consume the second message with that xa durable subscriber and abort it");
message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
{
_logger.info("wrong sequence number, 2 expected, received: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
_xaResource.end(xid1, XAResource.TMSUCCESS);
_xaResource.prepare(xid1);
_xaResource.rollback(xid1);
_logger.debug("close the session that deactivates the durable subscriber");
_session.close();
_logger.debug("create a new standard session");
stSession = _topicConnection.createTopicSession(true, 1);
_logger.debug("reconnect to the durable subscriber");
durSub = stSession.createDurableSubscriber(_topic, durSubName);
durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second");
_logger.debug("Reconnected to durablse subscribers");
_logger.debug(" consume the 2 remaining messages");
message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
{
_logger.info("wrong sequence number, 2 expected, received: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
// consume the third message with that xa durable subscriber
message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 3)
{
_logger.info("wrong sequence number, 3 expected, received: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
stSession.commit();
_logger.debug("the topic should be empty now");
message = (TextMessage) durSub.receive(1000);
if (message != null)
{
fail("Received unexpected message ");
}
stSession.commit();
_logger.debug(" use dursub1 to receive all the 3 messages");
for (int i = 1; i <= 3; i++)
{
message = (TextMessage) durSub1.receive(1000);
if (message == null)
{
_logger.debug("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
fail("wrong sequence number, " + i + " expected, received: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
}
stSession.commit();
// send a non persistent message to check that all persistent messages are deleted
producer = stSession.createProducer(_topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(_message);
stSession.commit();
message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("message not received ");
}
message = (TextMessage) durSub1.receive(1000);
if (message == null)
{
fail("message not received ");
}
stSession.commit();
stSession.close();
_logger.debug(" now create a standard non transacted session and reconnect to the durable xubscriber");
TopicConnection stConnection =
_topicConnection; //_topicFactory.createTopicConnection("guest", "guest");
TopicSession autoAclSession = stConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = autoAclSession.createPublisher(_topic);
durSub = autoAclSession.createDurableSubscriber(_topic, durSubName);
stConnection.start();
// produce 3 persistent messages
for (int i = 1; i <= 3; i++)
{
_message.setLongProperty(_sequenceNumberPropertyName, i);
//producer.send( _message );
publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0);
}
_logger.debug(" use dursub to receive all the 3 messages");
for (int i = 1; i <= 3; i++)
{
message = (TextMessage) durSub.receive(1000);
if (message == null)
{
_logger.info("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
_logger.info("wrong sequence number, " + i + " expected, received: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
}
_logger.debug("now set a message listener");
AtomicBoolean lock = new AtomicBoolean(true);
reset();
stConnection.stop();
durSub.setMessageListener(new TopicListener(1, 3, lock));
_logger.debug(" produce 3 persistent messages");
for (int i = 1; i <= 3; i++)
{
_message.setLongProperty(_sequenceNumberPropertyName, i);
//producer.send( _message );
publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0);
}
// start the connection
stConnection.start();
while (lock.get())
{
synchronized (lock)
{
lock.wait();
}
}
if (getFailureStatus())
{
fail("problem with message listener");
}
stConnection.stop();
durSub.setMessageListener(null);
_logger.debug(" do the same with an xa session");
// produce 3 persistent messages
for (int i = 1; i <= 3; i++)
{
_message.setLongProperty(_sequenceNumberPropertyName, i);
//producer.send( _message );
publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0);
}
//stConnection.close();
autoAclSession.close();
_logger.debug(" migrate the durable subscriber to an xa one");
_session = _topicConnection.createXATopicSession();
_xaResource = _session.getXAResource();
_xaResource.start(xid2, XAResource.TMNOFLAGS);
durSub = _session.createDurableSubscriber(_topic, durSubName);
lock = new AtomicBoolean();
reset();
_topicConnection.stop();
durSub.setMessageListener(new TopicListener(1, 3, lock));
// start the connection
_topicConnection.start();
while (lock.get())
{
synchronized (lock)
{
lock.wait();
}
}
if (getFailureStatus())
{
fail("problem with XA message listener");
}
_xaResource.end(xid2, XAResource.TMSUCCESS);
_xaResource.commit(xid2, true);
_session.close();
}
catch (Exception e)
{
_logger.error("Exception thrown", e);
fail("Exception thrown: " + e.getMessage());
}
finally
{
try
{
_topicConnection.createXASession().unsubscribe(durSubName);
_topicConnection.createXASession().unsubscribe(durSubName + "_second");
}
catch (JMSException e)
{
fail("Exception thrown when unsubscribing durable subscriber " + e.getMessage());
}
}
}
}
/** -------------------------------------------------------------------------------------- **/
/** ----------------------------- Utility methods --------------------------------------- **/
/** -------------------------------------------------------------------------------------- **/
/**
* get a new queue connection
*
* @return a new queue connection
* @throws javax.jms.JMSException If the JMS provider fails to create the queue connection
* due to some internal error or in case of authentication failure
*/
private XATopicConnection getNewTopicXAConnection() throws JMSException
{
return _topicFactory.createXATopicConnection("guest", "guest");
}
public static void failure()
{
_failure = true;
}
public static void reset()
{
_failure = false;
}
public static boolean getFailureStatus()
{
return _failure;
}
private class TopicListener implements MessageListener
{
private long _counter;
private long _end;
private final AtomicBoolean _lock;
public TopicListener(long init, long end, AtomicBoolean lock)
{
_counter = init;
_end = end;
_lock = lock;
}
public void onMessage(Message message)
{
long seq = 0;
try
{
seq = message.getLongProperty(TopicTest._sequenceNumberPropertyName);
}
catch (JMSException e)
{
_logger.error("Error getting long property: " + TopicTest._sequenceNumberPropertyName , e);
TopicTest.failure();
_lock.set(false);
synchronized (_lock)
{
_lock.notifyAll();
}
}
if (seq != _counter)
{
_logger.info("received message " + seq + " expected " + _counter);
TopicTest.failure();
_lock.set(false);
synchronized (_lock)
{
_lock.notifyAll();
}
}
_counter++;
if (_counter > _end)
{
_lock.set(false);
synchronized (_lock)
{
_lock.notifyAll();
}
}
}
}
}