| /* Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.test.unit.xa; |
| |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import javax.jms.*; |
| import javax.transaction.xa.XAException; |
| import javax.transaction.xa.XAResource; |
| import javax.transaction.xa.Xid; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.configuration.ClientProperties; |
| |
| /** |
| * |
| * |
| */ |
| public class TopicTest extends AbstractXATestCase |
| { |
| /* this class logger */ |
| private static final Logger _logger = LoggerFactory.getLogger(TopicTest.class); |
| |
| /** |
| * the topic use by all the tests |
| */ |
| private static Topic _topic = null; |
| |
| /** |
| * the topic connection factory used by all tests |
| */ |
| private static XATopicConnectionFactory _topicFactory = null; |
| |
| /** |
| * standard topic connection |
| */ |
| private static XATopicConnection _topicConnection = null; |
| |
| /** |
| * standard topic session created from the standard connection |
| */ |
| private static XATopicSession _session = null; |
| |
| private static TopicSession _nonXASession = null; |
| |
| /** |
| * the topic name |
| */ |
| private static final String TOPICNAME = "xaTopic"; |
| |
| /** |
| * Indicate that a listenere has failed |
| */ |
| private static boolean _failure = false; |
| |
| /** -------------------------------------------------------------------------------------- **/ |
| /** ----------------------------- JUnit support ----------------------------------------- **/ |
| /** -------------------------------------------------------------------------------------- **/ |
| |
| @Override |
| public void tearDown() throws Exception |
| { |
| if (!isBroker08()) |
| { |
| try |
| { |
| _topicConnection.stop(); |
| _topicConnection.close(); |
| } |
| catch (Exception e) |
| { |
| fail("Exception thrown when cleaning standard connection: " + e); |
| } |
| } |
| super.tearDown(); |
| } |
| |
| /** |
| * Initialize standard actors |
| */ |
| public void init() throws Exception |
| { |
| if (!isBroker08()) |
| { |
| setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1"); |
| // lookup test queue |
| _topicFactory = (XATopicConnectionFactory) getConnectionFactory(); |
| _topicConnection = getNewTopicXAConnection(); |
| _session = _topicConnection.createXATopicSession(); |
| _topic = _session.createTopic(getTestQueueName()); |
| _nonXASession = _topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); |
| init(_session, _topic); |
| } |
| } |
| |
| /** -------------------------------------------------------------------------------------- **/ |
| /** ----------------------------- Test Suite -------------------------------------------- **/ |
| /** -------------------------------------------------------------------------------------- **/ |
| |
| |
| /** |
| * Uses two transactions respectively with xid1 and xid2 that are use to send a message |
| * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. |
| * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. |
| */ |
| public void testProducer() |
| { |
| if (!isBroker08()) |
| { |
| _logger.debug("testProducer"); |
| Xid xid1 = getNewXid(); |
| Xid xid2 = getNewXid(); |
| try |
| { |
| Session nonXASession = _nonXASession; |
| MessageConsumer nonXAConsumer = nonXASession.createConsumer(_topic); |
| _producer.setDeliveryMode(DeliveryMode.PERSISTENT); |
| // start the xaResource for xid1 |
| try |
| { |
| _logger.debug("starting tx branch xid1"); |
| _xaResource.start(xid1, XAResource.TMNOFLAGS); |
| } |
| catch (XAException e) |
| { |
| _logger.error("cannot start the transaction with xid1", e); |
| fail("cannot start the transaction with xid1: " + e.getMessage()); |
| } |
| try |
| { |
| // start the connection |
| _topicConnection.start(); |
| _logger.debug("produce a message with sequence number 1"); |
| _message.setLongProperty(_sequenceNumberPropertyName, 1); |
| _producer.send(_message); |
| } |
| catch (JMSException e) |
| { |
| fail(" cannot send persistent message: " + e.getMessage()); |
| } |
| _logger.debug("suspend the transaction branch xid1"); |
| try |
| { |
| _xaResource.end(xid1, XAResource.TMSUSPEND); |
| } |
| catch (XAException e) |
| { |
| fail("Cannot end the transaction with xid1: " + e.getMessage()); |
| } |
| _logger.debug("start the xaResource for xid2"); |
| try |
| { |
| _xaResource.start(xid2, XAResource.TMNOFLAGS); |
| } |
| catch (XAException e) |
| { |
| fail("cannot start the transaction with xid2: " + e.getMessage()); |
| } |
| try |
| { |
| _logger.debug("produce a message"); |
| _message.setLongProperty(_sequenceNumberPropertyName, 2); |
| _producer.send(_message); |
| } |
| catch (JMSException e) |
| { |
| fail(" cannot send second persistent message: " + e.getMessage()); |
| } |
| _logger.debug("end xid2 and start xid1"); |
| try |
| { |
| _xaResource.end(xid2, XAResource.TMSUCCESS); |
| _xaResource.start(xid1, XAResource.TMRESUME); |
| } |
| catch (XAException e) |
| { |
| fail("Exception when ending and starting transactions: " + e.getMessage()); |
| } |
| _logger.debug("two phases commit transaction with xid2"); |
| try |
| { |
| int resPrepare = _xaResource.prepare(xid2); |
| if (resPrepare != XAResource.XA_OK) |
| { |
| fail("prepare returned: " + resPrepare); |
| } |
| _xaResource.commit(xid2, false); |
| } |
| catch (XAException e) |
| { |
| fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); |
| } |
| _logger.debug("receiving a message from topic test we expect it to be the second one"); |
| try |
| { |
| TextMessage message = (TextMessage) _consumer.receive(1000); |
| if (message == null) |
| { |
| fail("did not receive second message as expected "); |
| } |
| else |
| { |
| if (message.getLongProperty(_sequenceNumberPropertyName) != 2) |
| { |
| fail("receive wrong message its sequence number is: " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| } |
| catch (JMSException e) |
| { |
| fail("Exception when receiving second message: " + e.getMessage()); |
| } |
| _logger.debug("end and one phase commit the first transaction"); |
| try |
| { |
| _xaResource.end(xid1, XAResource.TMSUCCESS); |
| _xaResource.commit(xid1, true); |
| } |
| catch (XAException e) |
| { |
| fail("Exception thrown when commiting transaction with xid1"); |
| } |
| _logger.debug("We should now be able to receive the first and second message"); |
| try |
| { |
| TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); |
| if (message1 == null) |
| { |
| fail("did not receive first message as expected "); |
| } |
| else |
| { |
| if (message1.getLongProperty(_sequenceNumberPropertyName) != 2) |
| { |
| fail("receive wrong message its sequence number is: " + message1 |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| message1 = (TextMessage) nonXAConsumer.receive(1000); |
| if (message1 == null) |
| { |
| fail("did not receive first message as expected "); |
| } |
| else |
| { |
| if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) |
| { |
| fail("receive wrong message its sequence number is: " + message1 |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _logger.debug("commit transacted session"); |
| nonXASession.commit(); |
| _logger.debug("Test that the topic is now empty"); |
| message1 = (TextMessage) nonXAConsumer.receive(1000); |
| if (message1 != null) |
| { |
| fail("receive an unexpected message "); |
| } |
| } |
| catch (JMSException e) |
| { |
| fail("Exception thrown when emptying the queue: " + e.getMessage()); |
| } |
| } |
| catch (JMSException e) |
| { |
| fail("cannot create standard consumer: " + e.getMessage()); |
| } |
| } |
| } |
| |
| |
| /** |
| * strategy: Produce a message within Tx1 and commit tx1. consume this message within tx2 and abort tx2. |
| * Consume the same message within tx3 and commit it. Check that no more message is available. |
| */ |
| public void testDurSub() |
| { |
| if (!isBroker08()) |
| { |
| Xid xid1 = getNewXid(); |
| Xid xid2 = getNewXid(); |
| Xid xid3 = getNewXid(); |
| Xid xid4 = getNewXid(); |
| String durSubName = "xaSubDurable"; |
| try |
| { |
| TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); |
| try |
| { |
| _topicConnection.start(); |
| _logger.debug("start xid1"); |
| _xaResource.start(xid1, XAResource.TMNOFLAGS); |
| // start the connection |
| _topicConnection.start(); |
| _logger.debug("produce a message with sequence number 1"); |
| _message.setLongProperty(_sequenceNumberPropertyName, 1); |
| _producer.send(_message); |
| _logger.debug("2 phases commit xid1"); |
| _xaResource.end(xid1, XAResource.TMSUCCESS); |
| if (_xaResource.prepare(xid1) != XAResource.XA_OK) |
| { |
| fail("Problem when preparing tx1 "); |
| } |
| _xaResource.commit(xid1, false); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid1", e); |
| fail("Exception when working with xid1: " + e.getMessage()); |
| } |
| try |
| { |
| _logger.debug("start xid2"); |
| _xaResource.start(xid2, XAResource.TMNOFLAGS); |
| _logger.debug("receive the previously produced message"); |
| TextMessage message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| _logger.debug("rollback xid2"); |
| boolean rollbackOnFailure = false; |
| try |
| { |
| _xaResource.end(xid2, XAResource.TMFAIL); |
| } |
| catch (XAException e) |
| { |
| if (e.errorCode != XAException.XA_RBROLLBACK) |
| { |
| fail("Exception when working with xid2: " + e.getMessage()); |
| } |
| rollbackOnFailure = true; |
| } |
| if (!rollbackOnFailure) |
| { |
| if (_xaResource.prepare(xid2) != XAResource.XA_OK) |
| { |
| fail("Problem when preparing tx2 "); |
| } |
| _xaResource.rollback(xid2); |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid2", e); |
| fail("Exception when working with xid2: " + e.getMessage()); |
| } |
| try |
| { |
| _logger.debug("start xid3"); |
| _xaResource.start(xid3, XAResource.TMNOFLAGS); |
| _logger.debug(" receive the previously aborted consumed message"); |
| TextMessage message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| _logger.debug("commit xid3"); |
| _xaResource.end(xid3, XAResource.TMSUCCESS); |
| if (_xaResource.prepare(xid3) != XAResource.XA_OK) |
| { |
| fail("Problem when preparing tx3 "); |
| } |
| _xaResource.commit(xid3, false); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid3", e); |
| fail("Exception when working with xid3: " + e.getMessage()); |
| } |
| try |
| { |
| _logger.debug("start xid4"); |
| _xaResource.start(xid4, XAResource.TMNOFLAGS); |
| _logger.debug("check that topic is empty"); |
| TextMessage message = (TextMessage) xaDurSub.receive(1000); |
| if (message != null) |
| { |
| fail("An unexpected message was received "); |
| } |
| _logger.debug("commit xid4"); |
| _xaResource.end(xid4, XAResource.TMSUCCESS); |
| _xaResource.commit(xid4, true); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid4", e); |
| fail("Exception when working with xid4: " + e.getMessage()); |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("problem when creating dur sub", e); |
| fail("problem when creating dur sub: " + e.getMessage()); |
| } |
| finally |
| { |
| try |
| { |
| _session.unsubscribe(durSubName); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("problem when unsubscribing dur sub", e); |
| fail("problem when unsubscribing dur sub: " + e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * strategy: create a XA durable subscriber dusSub, produce 7 messages with the standard session, |
| * consume 2 messages respectively with tx1, tx2 and tx3 |
| * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! |
| * commit tx3 |
| * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! |
| * start tx4 and consume messages 1 - 4 and 7 |
| * commit tx4 |
| * Now the topic should be empty! |
| */ |
| public void testMultiMessagesDurSub() |
| { |
| if (!isBroker08()) |
| { |
| Xid xid1 = getNewXid(); |
| Xid xid2 = getNewXid(); |
| Xid xid3 = getNewXid(); |
| Xid xid4 = getNewXid(); |
| Xid xid6 = getNewXid(); |
| String durSubName = "xaSubDurable"; |
| TextMessage message; |
| try |
| { |
| TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); |
| try |
| { |
| Session txSession = _nonXASession; |
| MessageProducer txProducer = txSession.createProducer(_topic); |
| _logger.debug("produce 10 persistent messages"); |
| txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); |
| _topicConnection.start(); |
| for (int i = 1; i <= 7; i++) |
| { |
| _message.setLongProperty(_sequenceNumberPropertyName, i); |
| txProducer.send(_message); |
| } |
| // commit txSession |
| txSession.commit(); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Exception thrown when producing messages", e); |
| fail("Exception thrown when producing messages: " + e.getMessage()); |
| } |
| |
| try |
| { |
| _logger.debug(" consume 2 messages respectively with tx1, tx2 and tx3"); |
| //----- start xid1 |
| _xaResource.start(xid1, XAResource.TMNOFLAGS); |
| // receive the 2 first messages |
| for (int i = 1; i <= 2; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _xaResource.end(xid1, XAResource.TMSUSPEND); |
| //----- start xid2 |
| _xaResource.start(xid2, XAResource.TMNOFLAGS); |
| // receive the 2 first messages |
| for (int i = 3; i <= 4; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _xaResource.end(xid2, XAResource.TMSUSPEND); |
| //----- start xid3 |
| _xaResource.start(xid3, XAResource.TMNOFLAGS); |
| // receive the 2 first messages |
| for (int i = 5; i <= 6; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _xaResource.end(xid3, XAResource.TMSUCCESS); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception thrown when consumming 6 first messages", e); |
| fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); |
| } |
| try |
| { |
| _logger.debug("abort tx2, we now expect to receive messages 3, 4 and 7"); |
| _xaResource.start(xid2, XAResource.TMRESUME); |
| _xaResource.end(xid2, XAResource.TMSUCCESS); |
| _xaResource.prepare(xid2); |
| _xaResource.rollback(xid2); |
| // receive 3 message within tx1: 3, 4 and 7 |
| _xaResource.start(xid1, XAResource.TMRESUME); |
| _logger.debug(" 3, 4 and 7"); |
| for (int i = 1; i <= 3; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + 3); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) <= 2 || 5 == message |
| .getLongProperty(_sequenceNumberPropertyName) || message |
| .getLongProperty(_sequenceNumberPropertyName) == 6) |
| { |
| fail("wrong sequence number: " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception thrown when consumming message: 3, 4 and 7", e); |
| fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); |
| } |
| |
| try |
| { |
| _xaResource.end(xid1, XAResource.TMSUCCESS); |
| _logger.debug(" commit tx3"); |
| _xaResource.commit(xid3, true); |
| _logger.debug("abort tx1"); |
| _xaResource.prepare(xid1); |
| _xaResource.rollback(xid1); |
| } |
| catch (XAException e) |
| { |
| _logger.error("XAException thrown when committing tx3 or aborting tx1", e); |
| fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); |
| } |
| |
| try |
| { |
| // consume messages 1 - 4 + 7 |
| //----- start xid1 |
| _xaResource.start(xid4, XAResource.TMNOFLAGS); |
| for (int i = 1; i <= 5; i++) |
| { |
| |
| message = (TextMessage) xaDurSub.receive(1000); |
| |
| if(message != null) |
| { |
| _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) == 5 || message |
| .getLongProperty(_sequenceNumberPropertyName) == 6) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _xaResource.end(xid4, XAResource.TMSUCCESS); |
| _xaResource.prepare(xid4); |
| _xaResource.commit(xid4, false); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception thrown in last phase", e); |
| fail("Exception thrown in last phase: " + e.getMessage()); |
| } |
| // now the topic should be empty!! |
| try |
| { |
| // start xid6 |
| _xaResource.start(xid6, XAResource.TMNOFLAGS); |
| // should now be empty |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message != null) |
| { |
| fail("An unexpected message was received " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| // commit xid6 |
| _xaResource.end(xid6, XAResource.TMSUCCESS); |
| _xaResource.commit(xid6, true); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid6", e); |
| fail("Exception when working with xid6: " + e.getMessage()); |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("problem when creating dur sub", e); |
| fail("problem when creating dur sub: " + e.getMessage()); |
| } |
| finally |
| { |
| try |
| { |
| _session.unsubscribe(durSubName); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("problem when unsubscribing dur sub", e); |
| fail("problem when unsubscribing dur sub: " + e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * strategy: create a XA durable subscriber dusSub, produce 10 messages with the standard session, |
| * consume 2 messages respectively with tx1, tx2 and tx3 |
| * prepare xid2 and xid3 |
| * crash the server |
| * Redo the job for xid1 that has been aborted by server crash |
| * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! |
| * commit tx3 |
| * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! |
| * start tx4 and consume messages 1 - 4 |
| * start tx5 and consume messages 7 - 10 |
| * abort tx4 |
| * consume messages 1-4 with tx5 |
| * commit tx5 |
| * Now the topic should be empty! |
| */ |
| public void testMultiMessagesDurSubCrash() |
| { |
| if (!isBroker08()) |
| { |
| Xid xid1 = getNewXid(); |
| Xid xid2 = getNewXid(); |
| Xid xid3 = getNewXid(); |
| Xid xid4 = getNewXid(); |
| Xid xid5 = getNewXid(); |
| Xid xid6 = getNewXid(); |
| String durSubName = "xaSubDurable"; |
| TextMessage message; |
| try |
| { |
| TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); |
| try |
| { |
| Session txSession = _nonXASession; |
| MessageProducer txProducer = txSession.createProducer(_topic); |
| // produce 10 persistent messages |
| txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); |
| _topicConnection.start(); |
| for (int i = 1; i <= 10; i++) |
| { |
| _message.setLongProperty(_sequenceNumberPropertyName, i); |
| txProducer.send(_message); |
| } |
| // commit txSession |
| txSession.commit(); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Exception thrown when producing messages", e); |
| fail("Exception thrown when producing messages: " + e.getMessage()); |
| } |
| try |
| { |
| // consume 2 messages respectively with tx1, tx2 and tx3 |
| //----- start xid1 |
| _xaResource.start(xid1, XAResource.TMNOFLAGS); |
| // receive the 2 first messages |
| for (int i = 1; i <= 2; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _xaResource.end(xid1, XAResource.TMSUCCESS); |
| //----- start xid2 |
| _xaResource.start(xid2, XAResource.TMNOFLAGS); |
| // receive the 2 first messages |
| for (int i = 3; i <= 4; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _xaResource.end(xid2, XAResource.TMSUCCESS); |
| //----- start xid3 |
| _xaResource.start(xid3, XAResource.TMNOFLAGS); |
| // receive the 2 first messages |
| for (int i = 5; i <= 6; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _xaResource.end(xid3, XAResource.TMSUCCESS); |
| // prepare tx2 and tx3 |
| |
| _xaResource.prepare(xid2); |
| _xaResource.prepare(xid3); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception thrown when consumming 6 first messages", e); |
| fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); |
| } |
| /////// stop the broker now !! |
| try |
| { |
| restartDefaultBroker(); |
| init(); |
| } |
| catch (Exception e) |
| { |
| fail("Exception when stopping and restarting the server"); |
| } |
| // get the list of in doubt transactions |
| try |
| { |
| _topicConnection.start(); |
| // reconnect to dursub! |
| xaDurSub = _session.createDurableSubscriber(_topic, durSubName); |
| Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); |
| if (inDoubt == null) |
| { |
| fail("the array of in doubt transactions should not be null "); |
| } |
| // At that point we expect only two indoubt transactions: |
| if (inDoubt.length != 2) |
| { |
| fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); |
| } |
| } |
| catch (XAException e) |
| { |
| _logger.error("exception thrown when recovering transactions", e); |
| fail("exception thrown when recovering transactions " + e.getMessage()); |
| } |
| try |
| { |
| // xid1 has been aborted redo the job! |
| // consume 2 messages with tx1 |
| //----- start xid1 |
| _xaResource.start(xid1, XAResource.TMNOFLAGS); |
| // receive the 2 first messages |
| for (int i = 1; i <= 2; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| _xaResource.end(xid1, XAResource.TMSUSPEND); |
| // abort tx2, we now expect to receive messages 3 and 4 first! |
| _xaResource.rollback(xid2); |
| |
| // receive 3 message within tx1: 3, 4 and 7 |
| _xaResource.start(xid1, XAResource.TMRESUME); |
| // receive messages 3, 4 and 7 |
| Set<Long> expected = new HashSet<Long>(); |
| expected.add(3L); |
| expected.add(4L); |
| expected.add(7L); |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected one of: " + expected); |
| } |
| else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) |
| { |
| fail("wrong sequence number: " + message |
| .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); |
| } |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected one of: " + expected); |
| } |
| else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) |
| { |
| |
| fail("wrong sequence number: " + message |
| .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); |
| } |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected one of: " + expected); |
| } |
| else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) |
| { |
| fail("wrong sequence number: " + message |
| .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception thrown when consumming message: 3, 4 and 7", e); |
| fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); |
| } |
| |
| try |
| { |
| _xaResource.end(xid1, XAResource.TMSUSPEND); |
| // commit tx3 |
| _xaResource.commit(xid3, false); |
| // abort tx1 |
| _xaResource.prepare(xid1); |
| _xaResource.rollback(xid1); |
| } |
| catch (XAException e) |
| { |
| _logger.error("XAException thrown when committing tx3 or aborting tx1", e); |
| fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); |
| } |
| |
| try |
| { |
| // consume messages: could be any from (1 - 4, 7-10) |
| //----- start xid4 |
| Set<Long> expected = new HashSet<Long>(); |
| Set<Long> xid4msgs = new HashSet<Long>(); |
| for(long l = 1; l <= 4l; l++) |
| { |
| expected.add(l); |
| } |
| for(long l = 7; l <= 10l; l++) |
| { |
| expected.add(l); |
| } |
| _xaResource.start(xid4, XAResource.TMNOFLAGS); |
| for (int i = 1; i <= 4; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| |
| long seqNo = message.getLongProperty(_sequenceNumberPropertyName); |
| xid4msgs.add(seqNo); |
| |
| if (!expected.remove(seqNo)) |
| { |
| fail("wrong sequence number: " + seqNo + |
| " expected one from " + expected); |
| } |
| } |
| _xaResource.end(xid4, XAResource.TMSUSPEND); |
| // consume messages 8 - 10 |
| _xaResource.start(xid5, XAResource.TMNOFLAGS); |
| for (int i = 7; i <= 10; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName) |
| + " expected one from " + expected); |
| } |
| } |
| _xaResource.end(xid5, XAResource.TMSUSPEND); |
| // abort tx4 |
| _xaResource.prepare(xid4); |
| _xaResource.rollback(xid4); |
| expected.addAll(xid4msgs); |
| // consume messages 1-4 with tx5 |
| _xaResource.start(xid5, XAResource.TMRESUME); |
| for (int i = 1; i <= 4; i++) |
| { |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received! expected: " + i); |
| } |
| else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName) |
| + " expected one from " + expected); |
| } |
| } |
| _xaResource.end(xid5, XAResource.TMSUSPEND); |
| // commit tx5 |
| |
| _xaResource.prepare(xid5); |
| _xaResource.commit(xid5, false); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception thrown in last phase", e); |
| fail("Exception thrown in last phase: " + e.getMessage()); |
| } |
| // now the topic should be empty!! |
| try |
| { |
| // start xid6 |
| _xaResource.start(xid6, XAResource.TMNOFLAGS); |
| // should now be empty |
| message = (TextMessage) xaDurSub.receive(1000); |
| if (message != null) |
| { |
| fail("An unexpected message was received " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| // commit xid6 |
| _xaResource.end(xid6, XAResource.TMSUSPEND); |
| _xaResource.commit(xid6, true); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid6", e); |
| fail("Exception when working with xid6: " + e.getMessage()); |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("problem when creating dur sub", e); |
| fail("problem when creating dur sub: " + e.getMessage()); |
| } |
| finally |
| { |
| try |
| { |
| _session.unsubscribe(durSubName); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("problem when unsubscribing dur sub", e); |
| fail("problem when unsubscribing dur sub: " + e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * strategy: Produce a message within Tx1 and commit tx1. a durable subscriber then receives that message within tx2 |
| * that is then prepared. |
| * Shutdown the server and get the list of in doubt transactions: |
| * we expect tx2, Tx2 is aborted and the message consumed within tx3 that is committed we then check that the topic is empty. |
| */ |
| public void testDurSubCrash() |
| { |
| if (!isBroker08()) |
| { |
| Xid xid1 = getNewXid(); |
| Xid xid2 = getNewXid(); |
| Xid xid3 = getNewXid(); |
| Xid xid4 = getNewXid(); |
| String durSubName = "xaSubDurable"; |
| try |
| { |
| TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); |
| try |
| { |
| _topicConnection.start(); |
| //----- start xid1 |
| _xaResource.start(xid1, XAResource.TMNOFLAGS); |
| // start the connection |
| _topicConnection.start(); |
| // produce a message with sequence number 1 |
| _message.setLongProperty(_sequenceNumberPropertyName, 1); |
| _producer.send(_message); |
| // commit |
| _xaResource.end(xid1, XAResource.TMSUCCESS); |
| if (_xaResource.prepare(xid1) != XAResource.XA_OK) |
| { |
| fail("Problem when preparing tx1 "); |
| } |
| _xaResource.commit(xid1, false); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid1", e); |
| fail("Exception when working with xid1: " + e.getMessage()); |
| } |
| try |
| { |
| // start xid2 |
| _xaResource.start(xid2, XAResource.TMNOFLAGS); |
| // receive the previously produced message |
| TextMessage message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| // prepare xid2 |
| _xaResource.end(xid2, XAResource.TMSUCCESS); |
| if (_xaResource.prepare(xid2) != XAResource.XA_OK) |
| { |
| fail("Problem when preparing tx2 "); |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid2", e); |
| fail("Exception when working with xid2: " + e.getMessage()); |
| } |
| |
| /////// stop the server now !! |
| try |
| { |
| restartDefaultBroker(); |
| init(); |
| } |
| catch (Exception e) |
| { |
| fail("Exception when stopping and restarting the server"); |
| } |
| |
| // get the list of in doubt transactions |
| try |
| { |
| _topicConnection.start(); |
| // reconnect to dursub! |
| xaDurSub = _session.createDurableSubscriber(_topic, durSubName); |
| Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); |
| if (inDoubt == null) |
| { |
| fail("the array of in doubt transactions should not be null "); |
| } |
| // At that point we expect only two indoubt transactions: |
| if (inDoubt.length != 1) |
| { |
| fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); |
| } |
| |
| // commit them |
| for (Xid anInDoubt : inDoubt) |
| { |
| if (anInDoubt.equals(xid2)) |
| { |
| _logger.info("aborting xid2 "); |
| try |
| { |
| _xaResource.rollback(anInDoubt); |
| } |
| catch (Exception e) |
| { |
| _logger.error("exception when aborting xid2 ", e); |
| fail("exception when aborting xid2 "); |
| } |
| } |
| else |
| { |
| _logger.info("XID2 is not in doubt "); |
| } |
| } |
| } |
| catch (XAException e) |
| { |
| _logger.error("exception thrown when recovering transactions", e); |
| fail("exception thrown when recovering transactions " + e.getMessage()); |
| } |
| |
| try |
| { |
| // start xid3 |
| _xaResource.start(xid3, XAResource.TMNOFLAGS); |
| // receive the previously produced message and aborted |
| TextMessage message = (TextMessage) xaDurSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| // commit xid3 |
| _xaResource.end(xid3, XAResource.TMSUCCESS); |
| if (_xaResource.prepare(xid3) != XAResource.XA_OK) |
| { |
| fail("Problem when preparing tx3 "); |
| } |
| _xaResource.commit(xid3, false); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid3", e); |
| fail("Exception when working with xid3: " + e.getMessage()); |
| } |
| try |
| { |
| // start xid4 |
| _xaResource.start(xid4, XAResource.TMNOFLAGS); |
| // should now be empty |
| TextMessage message = (TextMessage) xaDurSub.receive(1000); |
| if (message != null) |
| { |
| fail("An unexpected message was received " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| // commit xid4 |
| _xaResource.end(xid4, XAResource.TMSUCCESS); |
| _xaResource.commit(xid4, true); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception when working with xid4", e); |
| fail("Exception when working with xid4: " + e.getMessage()); |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("problem when creating dur sub", e); |
| fail("problem when creating dur sub: " + e.getMessage()); |
| } |
| finally |
| { |
| try |
| { |
| _session.unsubscribe(durSubName); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("problem when unsubscribing dur sub", e); |
| fail("problem when unsubscribing dur sub: " + e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * strategy: Produce a message within Tx1 and prepare tx1. Shutdown the server and get the list of indoubt transactions: |
| * we expect tx1, Tx1 is committed so we expect the test topic not to be empty! |
| */ |
| public void testRecover() |
| { |
| if (!isBroker08()) |
| { |
| Xid xid1 = getNewXid(); |
| String durSubName = "test1"; |
| try |
| { |
| // create a dummy durable subscriber to be sure that messages are persisted! |
| _nonXASession.createDurableSubscriber(_topic, durSubName); |
| // start the xaResource for xid1 |
| try |
| { |
| _xaResource.start(xid1, XAResource.TMNOFLAGS); |
| } |
| catch (XAException e) |
| { |
| fail("cannot start the transaction with xid1: " + e.getMessage()); |
| } |
| try |
| { |
| // start the connection |
| _topicConnection.start(); |
| // produce a message with sequence number 1 |
| _message.setLongProperty(_sequenceNumberPropertyName, 1); |
| _producer.send(_message); |
| } |
| catch (JMSException e) |
| { |
| fail(" cannot send persistent message: " + e.getMessage()); |
| } |
| // suspend the transaction |
| try |
| { |
| _xaResource.end(xid1, XAResource.TMSUCCESS); |
| } |
| catch (XAException e) |
| { |
| fail("Cannot end the transaction with xid1: " + e.getMessage()); |
| } |
| // prepare the transaction with xid1 |
| try |
| { |
| _xaResource.prepare(xid1); |
| } |
| catch (XAException e) |
| { |
| fail("Exception when preparing xid1: " + e.getMessage()); |
| } |
| |
| /////// stop the server now !! |
| try |
| { |
| restartDefaultBroker(); |
| init(); |
| } |
| catch (Exception e) |
| { |
| fail("Exception when stopping and restarting the server"); |
| } |
| |
| try |
| { |
| MessageConsumer nonXAConsumer = _nonXASession.createDurableSubscriber(_topic, durSubName); |
| _topicConnection.start(); |
| // get the list of in doubt transactions |
| try |
| { |
| Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); |
| if (inDoubt == null) |
| { |
| fail("the array of in doubt transactions should not be null "); |
| } |
| // At that point we expect only two indoubt transactions: |
| if (inDoubt.length != 1) |
| { |
| fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); |
| } |
| // commit them |
| for (Xid anInDoubt : inDoubt) |
| { |
| if (anInDoubt.equals(xid1)) |
| { |
| _logger.debug("committing xid1 "); |
| try |
| { |
| _xaResource.commit(anInDoubt, false); |
| } |
| catch (Exception e) |
| { |
| _logger.error("PB when aborted xid1"); |
| fail("exception when committing xid1 "); |
| } |
| } |
| else |
| { |
| _logger.debug("XID1 is not in doubt "); |
| } |
| } |
| } |
| catch (XAException e) |
| { |
| _logger.error("exception thrown when recovering transactions ", e); |
| fail("exception thrown when recovering transactions " + e.getMessage()); |
| } |
| _logger.debug("the topic should not be empty"); |
| TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); |
| if (message1 == null) |
| { |
| fail("The topic is empty! "); |
| } |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception thrown when testin that queue test is empty", e); |
| fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); |
| } |
| } |
| catch (JMSException e) |
| { |
| _logger.error("cannot create dummy durable subscriber", e); |
| fail("cannot create dummy durable subscriber: " + e.getMessage()); |
| } |
| finally |
| { |
| try |
| { |
| // unsubscribe the dummy durable subscriber |
| TopicSession nonXASession = _nonXASession; |
| nonXASession.unsubscribe(durSubName); |
| } |
| catch (JMSException e) |
| { |
| fail("cannot unsubscribe durable subscriber: " + e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * strategy: |
| * create a standard durable subscriber |
| * produce 3 messages |
| * consume the first message with that durable subscriber |
| * close the standard session that deactivates the durable subscriber |
| * migrate the durable subscriber to an xa one |
| * consume the second message with that xa durable subscriber |
| * close the xa session that deactivates the durable subscriber |
| * reconnect to the durable subscriber with a standard session |
| * consume the two remaining messages and check that the topic is empty! |
| */ |
| public void testMigrateDurableSubscriber() |
| { |
| if (!isBroker08()) |
| { |
| Xid xid1 = getNewXid(); |
| Xid xid2 = getNewXid(); |
| String durSubName = "DurableSubscriberMigrate"; |
| try |
| { |
| Session stSession = _nonXASession; |
| MessageProducer producer = stSession.createProducer(_topic); |
| _logger.debug("Create a standard durable subscriber!"); |
| TopicSubscriber durSub = stSession.createDurableSubscriber(_topic, durSubName); |
| TopicSubscriber durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); |
| TextMessage message; |
| producer.setDeliveryMode(DeliveryMode.PERSISTENT); |
| _topicConnection.start(); |
| _logger.debug("produce 3 messages"); |
| for (int i = 1; i <= 3; i++) |
| { |
| _message.setLongProperty(_sequenceNumberPropertyName, i); |
| //producer.send( _message ); |
| producer.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); |
| stSession.commit(); |
| } |
| _logger.debug("consume the first message with that durable subscriber"); |
| message = (TextMessage) durSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) |
| { |
| fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); |
| } |
| // commit the standard session |
| stSession.commit(); |
| _logger.debug("first message consumed "); |
| // close the session that deactivates the durable subscriber |
| stSession.close(); |
| _logger.debug("migrate the durable subscriber to an xa one"); |
| _xaResource.start(xid1, XAResource.TMNOFLAGS); |
| durSub = _session.createDurableSubscriber(_topic, durSubName); |
| _logger.debug(" consume the second message with that xa durable subscriber and abort it"); |
| message = (TextMessage) durSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) |
| { |
| _logger.info("wrong sequence number, 2 expected, received: " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| _xaResource.end(xid1, XAResource.TMSUCCESS); |
| _xaResource.prepare(xid1); |
| _xaResource.rollback(xid1); |
| _logger.debug("close the session that deactivates the durable subscriber"); |
| _session.close(); |
| _logger.debug("create a new standard session"); |
| stSession = _topicConnection.createTopicSession(true, 1); |
| _logger.debug("reconnect to the durable subscriber"); |
| durSub = stSession.createDurableSubscriber(_topic, durSubName); |
| durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); |
| _logger.debug("Reconnected to durablse subscribers"); |
| _logger.debug(" consume the 2 remaining messages"); |
| message = (TextMessage) durSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) |
| { |
| _logger.info("wrong sequence number, 2 expected, received: " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| // consume the third message with that xa durable subscriber |
| message = (TextMessage) durSub.receive(1000); |
| if (message == null) |
| { |
| fail("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) |
| { |
| _logger.info("wrong sequence number, 3 expected, received: " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| stSession.commit(); |
| _logger.debug("the topic should be empty now"); |
| message = (TextMessage) durSub.receive(1000); |
| if (message != null) |
| { |
| fail("Received unexpected message "); |
| } |
| stSession.commit(); |
| _logger.debug(" use dursub1 to receive all the 3 messages"); |
| for (int i = 1; i <= 3; i++) |
| { |
| message = (TextMessage) durSub1.receive(1000); |
| if (message == null) |
| { |
| _logger.debug("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| fail("wrong sequence number, " + i + " expected, received: " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| stSession.commit(); |
| // send a non persistent message to check that all persistent messages are deleted |
| producer = stSession.createProducer(_topic); |
| producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); |
| producer.send(_message); |
| stSession.commit(); |
| message = (TextMessage) durSub.receive(1000); |
| if (message == null) |
| { |
| fail("message not received "); |
| } |
| message = (TextMessage) durSub1.receive(1000); |
| if (message == null) |
| { |
| fail("message not received "); |
| } |
| stSession.commit(); |
| stSession.close(); |
| _logger.debug(" now create a standard non transacted session and reconnect to the durable xubscriber"); |
| TopicConnection stConnection = |
| _topicConnection; //_topicFactory.createTopicConnection("guest", "guest"); |
| TopicSession autoAclSession = stConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); |
| TopicPublisher publisher = autoAclSession.createPublisher(_topic); |
| durSub = autoAclSession.createDurableSubscriber(_topic, durSubName); |
| stConnection.start(); |
| // produce 3 persistent messages |
| for (int i = 1; i <= 3; i++) |
| { |
| _message.setLongProperty(_sequenceNumberPropertyName, i); |
| //producer.send( _message ); |
| publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); |
| } |
| _logger.debug(" use dursub to receive all the 3 messages"); |
| for (int i = 1; i <= 3; i++) |
| { |
| message = (TextMessage) durSub.receive(1000); |
| if (message == null) |
| { |
| _logger.info("no message received "); |
| } |
| else if (message.getLongProperty(_sequenceNumberPropertyName) != i) |
| { |
| _logger.info("wrong sequence number, " + i + " expected, received: " + message |
| .getLongProperty(_sequenceNumberPropertyName)); |
| } |
| } |
| |
| _logger.debug("now set a message listener"); |
| AtomicBoolean lock = new AtomicBoolean(true); |
| reset(); |
| stConnection.stop(); |
| durSub.setMessageListener(new TopicListener(1, 3, lock)); |
| _logger.debug(" produce 3 persistent messages"); |
| for (int i = 1; i <= 3; i++) |
| { |
| _message.setLongProperty(_sequenceNumberPropertyName, i); |
| //producer.send( _message ); |
| publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); |
| } |
| // start the connection |
| stConnection.start(); |
| while (lock.get()) |
| { |
| synchronized (lock) |
| { |
| lock.wait(); |
| } |
| } |
| if (getFailureStatus()) |
| { |
| fail("problem with message listener"); |
| } |
| stConnection.stop(); |
| durSub.setMessageListener(null); |
| _logger.debug(" do the same with an xa session"); |
| // produce 3 persistent messages |
| for (int i = 1; i <= 3; i++) |
| { |
| _message.setLongProperty(_sequenceNumberPropertyName, i); |
| //producer.send( _message ); |
| publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); |
| } |
| //stConnection.close(); |
| autoAclSession.close(); |
| _logger.debug(" migrate the durable subscriber to an xa one"); |
| _session = _topicConnection.createXATopicSession(); |
| _xaResource = _session.getXAResource(); |
| _xaResource.start(xid2, XAResource.TMNOFLAGS); |
| durSub = _session.createDurableSubscriber(_topic, durSubName); |
| lock = new AtomicBoolean(); |
| reset(); |
| _topicConnection.stop(); |
| durSub.setMessageListener(new TopicListener(1, 3, lock)); |
| // start the connection |
| _topicConnection.start(); |
| while (lock.get()) |
| { |
| synchronized (lock) |
| { |
| lock.wait(); |
| } |
| } |
| if (getFailureStatus()) |
| { |
| fail("problem with XA message listener"); |
| } |
| _xaResource.end(xid2, XAResource.TMSUCCESS); |
| _xaResource.commit(xid2, true); |
| _session.close(); |
| } |
| catch (Exception e) |
| { |
| _logger.error("Exception thrown", e); |
| fail("Exception thrown: " + e.getMessage()); |
| } |
| finally |
| { |
| try |
| { |
| _topicConnection.createXASession().unsubscribe(durSubName); |
| _topicConnection.createXASession().unsubscribe(durSubName + "_second"); |
| } |
| catch (JMSException e) |
| { |
| fail("Exception thrown when unsubscribing durable subscriber " + e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| /** -------------------------------------------------------------------------------------- **/ |
| /** ----------------------------- Utility methods --------------------------------------- **/ |
| /** -------------------------------------------------------------------------------------- **/ |
| |
| /** |
| * get a new queue connection |
| * |
| * @return a new queue connection |
| * @throws javax.jms.JMSException If the JMS provider fails to create the queue connection |
| * due to some internal error or in case of authentication failure |
| */ |
| private XATopicConnection getNewTopicXAConnection() throws JMSException |
| { |
| return _topicFactory.createXATopicConnection("guest", "guest"); |
| } |
| |
| public static void failure() |
| { |
| _failure = true; |
| } |
| |
| public static void reset() |
| { |
| _failure = false; |
| } |
| |
| public static boolean getFailureStatus() |
| { |
| return _failure; |
| } |
| |
| private class TopicListener implements MessageListener |
| { |
| private long _counter; |
| private long _end; |
| private final AtomicBoolean _lock; |
| |
| public TopicListener(long init, long end, AtomicBoolean lock) |
| { |
| _counter = init; |
| _end = end; |
| _lock = lock; |
| } |
| |
| public void onMessage(Message message) |
| { |
| long seq = 0; |
| try |
| { |
| seq = message.getLongProperty(TopicTest._sequenceNumberPropertyName); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error getting long property: " + TopicTest._sequenceNumberPropertyName , e); |
| TopicTest.failure(); |
| _lock.set(false); |
| synchronized (_lock) |
| { |
| _lock.notifyAll(); |
| } |
| } |
| if (seq != _counter) |
| { |
| _logger.info("received message " + seq + " expected " + _counter); |
| TopicTest.failure(); |
| _lock.set(false); |
| synchronized (_lock) |
| { |
| _lock.notifyAll(); |
| } |
| } |
| _counter++; |
| if (_counter > _end) |
| { |
| _lock.set(false); |
| synchronized (_lock) |
| { |
| _lock.notifyAll(); |
| } |
| } |
| } |
| } |
| |
| } |