QPIDJMS-519 Correctly recover transaction after failover
Adds additional guards against allowing a send or acknowledge
to occur outside the bounds of a transaction when recovering a
connection following failover. Adds tests to ensure that the
transaction becomes in-doubt whenever the state of recovery
cannot ensure that there will be no lost work or untransacted
work. Test added to cover cases where this was previously seen.
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 0b0b370..e659834 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -16,8 +16,10 @@
*/
package org.apache.qpid.jms;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
@@ -43,7 +45,7 @@
private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class);
- private final Map<JmsResourceId, JmsResourceId> participants = new HashMap<JmsResourceId, JmsResourceId>();
+ private final Map<JmsResourceId, JmsResourceId> participants = Collections.synchronizedMap(new HashMap<JmsResourceId, JmsResourceId>());
private final JmsSession session;
private final JmsConnection connection;
private JmsTransactionInfo transactionInfo;
@@ -61,6 +63,9 @@
lock.readLock().lock();
try {
if (isInDoubt()) {
+ // Prevent recovery from reseting transaction to normal operating state.
+ participants.put(envelope.getProducerId(), envelope.getProducerId());
+
// Need to signal that the request is going to pass before completing
if (outcome != null) {
outcome.onPendingSuccess();
@@ -69,6 +74,7 @@
if (envelope.isCompletionRequired()) {
connection.onCompletedMessageSend(envelope);
}
+
return;
}
@@ -137,6 +143,8 @@
try {
reset();
final JmsTransactionInfo transactionInfo = getNextTransactionInfo();
+
+ LOG.debug("Initiating Begin of txn: {}", transactionInfo.getId());
connection.createResource(transactionInfo, new ProviderSynchronization() {
@Override
@@ -159,7 +167,7 @@
}
}
- LOG.debug("Begin: {}", transactionInfo.getId());
+ LOG.trace("Completed Begin of txn: {}", transactionInfo.getId());
} finally {
lock.writeLock().unlock();
}
@@ -179,7 +187,7 @@
} else {
LOG.debug("Commit: {}", transactionInfo.getId());
- JmsTransactionId oldTransactionId = transactionInfo.getId();
+ final JmsTransactionId oldTransactionId = transactionInfo.getId();
final JmsTransactionInfo nextTx = getNextTransactionInfo();
try {
@@ -251,13 +259,14 @@
private void doRollback(boolean startNewTx) throws JMSException {
lock.writeLock().lock();
try {
- if(transactionInfo == null) {
+ if (transactionInfo == null) {
return;
}
LOG.debug("Rollback: {}", transactionInfo.getId());
- JmsTransactionId oldTransactionId = transactionInfo.getId();
+ final JmsTransactionId oldTransactionId = transactionInfo.getId();
final JmsTransactionInfo nextTx;
+
if (startNewTx) {
nextTx = getNextTransactionInfo();
} else {
@@ -335,7 +344,7 @@
public void onConnectionInterrupted() {
lock.writeLock().tryLock();
try {
- if(transactionInfo != null) {
+ if (transactionInfo != null) {
transactionInfo.setInDoubt(true);
}
} finally {
@@ -347,25 +356,34 @@
@Override
public void onConnectionRecovery(Provider provider) throws Exception {
- // If we get the lock then no TX commit / rollback / begin is in progress
- // otherwise one is and we can only assume that it should fail given the
- // connection was dropped.
- if (lock.writeLock().tryLock()) {
+ if (lock.writeLock().tryLock(5, TimeUnit.MILLISECONDS)) {
+ // If we got the lock then there is no pending commit / rollback / begin / send or
+ // acknowledgement so we can safely create a new transaction, if there is work pending
+ // on the current transaction we must mark it as in-doubt so that a commit attempt
+ // will then roll it back. In all other cases the transaction should be marked as
+ // in-doubt if not already done so as this currently runs outside the IO thread and
+ // cannot guard against sudden appearance of sends or acks within the transaction.
try {
- // If we got the lock then there is no pending commit / rollback / begin so
- // we can safely create a new transaction, if there is work pending on the
- // current transaction we must mark it as in-doubt so that a commit attempt
- // will then roll it back.
+ // Session must create a new transaction on start and there could be a connection
+ // drop before that happens which means we don't need to create one yet as there
+ // wasn't one before that needs replacing.
+ if (transactionInfo == null) {
+ LOG.trace("Transaction context skipping recovery because no transaction previously existed.");
+ return;
+ }
+
transactionInfo = getNextTransactionInfo();
ProviderFuture request = provider.newProviderFuture(new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
+ LOG.trace("TX:{} Recovery of Transaction succeeded: in-doubt state: {}.", transactionInfo.getId(), !participants.isEmpty());
transactionInfo.setInDoubt(!participants.isEmpty());
}
@Override
public void onPendingFailure(ProviderException cause) {
+ LOG.trace("TX:{} Recovery of Transaction failed and current state set to in-doubt: {}.", transactionInfo.getId(), cause);
transactionInfo.setInDoubt(true);
}
});
@@ -377,11 +395,11 @@
} finally {
lock.writeLock().unlock();
}
- } else {
+ } else if (transactionInfo != null) {
+ // A previous transaction exists and a pending transaction write locked scoped operation is awaiting
+ // its chance to run within the IO thread, we don't know what work it performed in that TX so our only
+ // option is to mark it as in doubt and rolled it back on next commit.
LOG.trace("Transaction recovery marking current TX:{} as in-doubt.", transactionInfo.getId());
-
- // We did not get the lock so there is an operation in progress and our only
- // option is to mark the state as failed so a commit will roll back.
transactionInfo.setInDoubt(true);
}
}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index c5990e5..39b597c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -88,7 +88,7 @@
}
private void acknowledgeUndeliveredRecoveredMessages() {
- if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE
+ if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE
|| acknowledgementMode == Session.AUTO_ACKNOWLEDGE
|| acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE
|| acknowledgementMode == INDIVIDUAL_ACKNOWLEDGE) {
@@ -257,7 +257,7 @@
}
JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
- if(ackType == ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) {
+ if (ackType == ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) {
handleDisposition(envelope, current, MODIFIED_FAILED);
} else if (envelope.isDelivered()) {
final DeliveryState disposition;
@@ -347,8 +347,7 @@
LOG.debug("Accepted Ack of message: {}", envelope);
if (!delivery.remotelySettled()) {
if (session.isTransacted() && !getResourceInfo().isBrowser()) {
-
- if (session.isTransactionFailed()) {
+ if (session.isTransactionInDoubt()) {
LOG.trace("Skipping ack of message {} in failed transaction.", envelope);
return;
}
@@ -462,7 +461,7 @@
deliver(reverseIterator.previous());
}
- if(deferredClose) {
+ if (deferredClose) {
acknowledgeUndeliveredRecoveredMessages();
tryCompleteDeferredClose();
}
@@ -523,7 +522,7 @@
@Override
public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws ProviderException {
- if(delivery.getDefaultDeliveryState() == null){
+ if (delivery.getDefaultDeliveryState() == null){
delivery.setDefaultDeliveryState(Released.getInstance());
}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index ad993bd..d222dc0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -93,14 +93,18 @@
request.onFailure(new ProviderIllegalStateException("The MessageProducer is closed"));
}
+ final InFlightSend send = new InFlightSend(envelope, request);
+
if (!delayedDeliverySupported && envelope.getMessage().getFacade().isDeliveryTimeTransmitted()) {
// Don't allow sends with delay if the remote has not said it can handle them
- request.onFailure(new ProviderUnsupportedOperationException("Remote does not support delayed message delivery"));
+ send.onFailure(new ProviderUnsupportedOperationException("Remote does not support delayed message delivery"));
+ } else if (session.isTransactionInDoubt()) {
+ // If the transaction has failed due to remote termination etc then we just indicate
+ // the send has succeeded until the a new transaction is started.
+ send.onSuccess();
} else if (getEndpoint().getCredit() <= 0) {
LOG.trace("Holding Message send until credit is available.");
- InFlightSend send = new InFlightSend(envelope, request);
-
if (getSendTimeout() > JmsConnectionInfo.INFINITE) {
send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send);
}
@@ -108,14 +112,7 @@
blocked.put(envelope.getMessageId(), send);
getParent().getProvider().pumpToProtonTransport(request);
} else {
- // If the transaction has failed due to remote termination etc then we just indicate
- // the send has succeeded until the a new transaction is started.
- if (session.isTransacted() && session.isTransactionFailed()) {
- request.onSuccess();
- return;
- }
-
- doSend(envelope, new InFlightSend(envelope, request));
+ doSend(envelope, send);
}
}
@@ -190,7 +187,7 @@
try {
// If the transaction has failed due to remote termination etc then we just indicate
// the send has succeeded until the a new transaction is started.
- if (session.isTransacted() && session.isTransactionFailed()) {
+ if (session.isTransacted() && session.isTransactionInDoubt()) {
held.onSuccess();
return;
}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 62d3071..111e774 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -273,8 +273,8 @@
return getResourceInfo().isTransacted();
}
- public boolean isTransactionFailed() {
- return txContext == null ? false : txContext.isTransactionFailed();
+ public boolean isTransactionInDoubt() {
+ return txContext == null ? false : txContext.isTransactionInDoubt();
}
boolean isAsyncAck() {
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index b81cee1..2415967 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -224,8 +224,10 @@
return current;
}
- public boolean isTransactionFailed() {
- return coordinator == null ? false : coordinator.isClosed();
+ public boolean isTransactionInDoubt() {
+ // A context either has an active transaction or the transaction state of all
+ // operations is in-doubt and cannot proceed as normal.
+ return coordinator == null ? true : coordinator.isClosed();
}
public Binary getAmqpTransactionId() {
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index abfd4b5..f22b018 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -1733,4 +1733,63 @@
connection.close(); // Already nuked under the covers due to txn-id being missing
}
}
+
+ @Test(timeout = 30_000)
+ public void testAsyncConsumerAcksAfterCommitAndBeginWhenCommitCalledInOnMessage() throws Exception {
+ final Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+ final TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ final TransactionalStateMatcher transferStateMatcher = new TransactionalStateMatcher();
+ transferStateMatcher.withTxnId(equalTo(txnId));
+ transferStateMatcher.withOutcome(nullValue());
+
+ final TransactionalStateMatcher dispositionStateMatcher = new TransactionalStateMatcher();
+ dispositionStateMatcher.withTxnId(equalTo(txnId));
+ dispositionStateMatcher.withOutcome(new AcceptedMatcher());
+
+ final TransactionalState transferTxnOutcome = new TransactionalState();
+ transferTxnOutcome.setTxnId(txnId);
+ transferTxnOutcome.setOutcome(new Accepted());
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=1000");
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+ testPeer.expectDeclare(txnId);
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+ testPeer.expectDisposition(true, dispositionStateMatcher);
+ testPeer.expectSenderAttach();
+ testPeer.expectTransfer(messageMatcher, transferStateMatcher, transferTxnOutcome, true);
+ testPeer.expectDischarge(txnId, false);
+ testPeer.expectDeclare(txnId);
+
+ // Test that consumer onMessage delivery and a send within the listener are
+ // both included into the same transaction prior to the commit in the listener.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ messageConsumer.setMessageListener((message) -> {
+ try {
+ session.createProducer(queue).send(session.createTextMessage("sample"));
+ session.commit();
+ } catch (JMSException jmsEx) {
+ throw new RuntimeException("Behaving badly since commit already did", jmsEx);
+ }
+ });
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 899e99e..805ae70 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -90,6 +90,7 @@
import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
@@ -1645,13 +1646,15 @@
}
}
+ @Repeat(repetitions = 1)
@Test(timeout=20000)
- public void testTxRecreatedAfterConnectionFailsOver() throws Exception {
+ public void testTxRecreatedAfterConnectionFailsOverDropsAfterCoordinatorAttach() throws Exception {
doTxRecreatedAfterConnectionFailsOver(true);
}
+ @Repeat(repetitions = 1)
@Test(timeout=20000)
- public void testTxRecreatedAfterConnectionFailsOver2() throws Exception {
+ public void testTxRecreatedAfterConnectionFailsOverDropsAfterSessionBegin() throws Exception {
doTxRecreatedAfterConnectionFailsOver(false);
}
@@ -1697,13 +1700,15 @@
originalPeer.expectBegin();
- Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- if(dropAfterCoordinator) {
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+ if (dropAfterCoordinator) {
originalPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
- originalPeer.expectDeclare(txnId);
+ originalPeer.expectDeclare(txnId1);
}
originalPeer.dropAfterLastHandler();
@@ -1715,25 +1720,25 @@
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectCoordinatorAttach();
- finalPeer.expectDeclare(txnId);
+ finalPeer.expectDeclare(txnId2);
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded.
- finalPeer.expectDischarge(txnId, true);
+ finalPeer.expectDischarge(txnId2, true);
finalPeer.expectEnd();
+ finalPeer.expectClose();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+ LOG.debug("About to close session following final peer connection.");
session.close();
-
- // Shut it down
- finalPeer.expectClose();
+ LOG.debug("About to close connection following final peer connection.");
connection.close();
originalPeer.waitForAllHandlersToComplete(2000);
- finalPeer.waitForAllHandlersToComplete(1000);
+ finalPeer.waitForAllHandlersToComplete(2000);
}
}
@@ -2527,6 +2532,7 @@
}
}
+ @Repeat(repetitions = 1)
@Test(timeout=20000)
public void testTxCommitThrowsWhenNoDischargeResponseSentAndConnectionDrops() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
@@ -2878,9 +2884,8 @@
} catch (IllegalStateException jmsise) {
if (jmsise.getCause() != null) {
String message = jmsise.getCause().getMessage();
- if(errorCondition != null) {
- return message.contains(errorCondition.toString()) &&
- message.contains(errorDescription);
+ if (errorCondition != null) {
+ return message.contains(errorCondition.toString()) && message.contains(errorDescription);
} else {
return message.contains("Unknown error from remote peer");
}
@@ -3435,7 +3440,8 @@
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
- Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
@@ -3445,11 +3451,11 @@
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
- stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withTxnId(equalTo(txnId1));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
- txState.setTxnId(txnId);
+ txState.setTxnId(txnId1);
txState.setOutcome(new Accepted());
originalPeer.expectSaslAnonymous();
@@ -3459,7 +3465,7 @@
originalPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
- originalPeer.expectDeclare(txnId);
+ originalPeer.expectDeclare(txnId1);
originalPeer.expectSenderAttach();
originalPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
originalPeer.dropAfterLastHandler(10);
@@ -3470,13 +3476,13 @@
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectCoordinatorAttach();
- finalPeer.expectDeclare(txnId);
+ finalPeer.expectDeclare(txnId2);
finalPeer.expectSenderAttach();
// Attempt to commit the in-doubt TX will result in rollback and a new TX will be started.
- finalPeer.expectDischarge(txnId, true);
- finalPeer.expectDeclare(txnId);
+ finalPeer.expectDischarge(txnId2, true);
+ finalPeer.expectDeclare(txnId1);
// this rollback comes from the session being closed on connection close.
- finalPeer.expectDischarge(txnId, true);
+ finalPeer.expectDischarge(txnId1, true);
finalPeer.expectClose();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
@@ -3552,6 +3558,854 @@
@Repeat(repetitions = 1)
@Test(timeout = 20000)
+ public void testSendAndConnectionDropsRecoveredAsInDoubtTransaction() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ // Send should occurs within transaction #1
+ TransactionalStateMatcher txn1StateMatcher = new TransactionalStateMatcher();
+ txn1StateMatcher.withTxnId(equalTo(txnId1));
+ txn1StateMatcher.withOutcome(nullValue());
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectCoordinatorAttach();
+ originalPeer.expectDeclare(txnId1);
+ originalPeer.expectSenderAttach();
+ // Send is synchronous so we don't respond in order to stall the MessageProducer
+ // in the send call to block recovery from initiating a new transaction which
+ // will then cause the current transaction to become in-doubt and commit should
+ // throw a transaction rolled back exception.
+ originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, false, false, null, false);
+ originalPeer.dropAfterLastHandler();
+
+ // --- Post Failover Expectations of sender --- //
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+ // Send will be blocked waiting to fire so it will not be filtered by the local
+ // transaction context in-doubt checks since there was no other work pending the
+ // transaction will be fully recovered so the fixed producer must ensure that no
+ // send occurs outside the transaction boundaries.
+ finalPeer.expectCoordinatorAttach();
+ finalPeer.expectDeclare(txnId2);
+ finalPeer.expectDischarge(txnId2, true);
+ finalPeer.expectClose();
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.setForceSyncSend(true);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalPeer.getServerPort() == remoteURI.getPort()) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalPeer.getServerPort() == remoteURI.getPort()) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+
+ TextMessage message = session.createTextMessage(text);
+
+ try {
+ producer.send(message);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed to send.");
+ }
+
+ assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+ try {
+ session.commit();
+ fail("Transaction should throw rolled back error as an operation is pending on recover.");
+ } catch (TransactionRolledBackException txrbex) {
+ }
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Repeat(repetitions = 1)
+ @Test(timeout = 20000)
+ public void testSecondSendAndConnectionDropsResendsButTransactionRollsBackAsInDoubt() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ // Send should occurs within transaction #1
+ TransactionalStateMatcher txn1StateMatcher = new TransactionalStateMatcher();
+ txn1StateMatcher.withTxnId(equalTo(txnId1));
+ txn1StateMatcher.withOutcome(nullValue());
+
+ // Disposition should occurs within transaction #1 before failover
+ TransactionalState txn1Disposition = new TransactionalState();
+ txn1Disposition.setTxnId(txnId2);
+ txn1Disposition.setOutcome(new Accepted());
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectCoordinatorAttach();
+ originalPeer.expectDeclare(txnId1);
+ originalPeer.expectSenderAttach();
+ originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, txn1Disposition, true);
+ // Send is synchronous so we don't respond in order to stall the MessageProducer
+ // in the send call to block recovery from initiating a new transaction which
+ // will then cause the current transaction to become in-doubt and commit should
+ // throw a transaction rolled back exception.
+ originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, false, false, null, false);
+ originalPeer.dropAfterLastHandler();
+
+ // --- Post Failover Expectations of sender --- //
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+ // Send will be blocked waiting to fire so it will not be filtered
+ // by the local transaction context in-doubt checks, however since there
+ // was pending transactional work the transaction will be rolled back
+ // on commit and then a new transaction will be activated. The producer
+ // will filter the held send as there is no active transaction.
+ finalPeer.expectCoordinatorAttach();
+ finalPeer.expectDeclare(txnId2);
+ finalPeer.expectDischarge(txnId2, true);
+ finalPeer.expectClose();
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.setForceSyncSend(true);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalPeer.getServerPort() == remoteURI.getPort()) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalPeer.getServerPort() == remoteURI.getPort()) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+
+ TextMessage message = session.createTextMessage(text);
+
+ try {
+ producer.send(message);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed to send.");
+ }
+
+ try {
+ producer.send(message);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed to send.");
+ }
+
+ assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+ try {
+ session.commit();
+ fail("Transaction should have been been in-doubt and a rolled back error thrown.");
+ } catch (TransactionRolledBackException txrbex) {
+ }
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Repeat(repetitions = 1)
+ @Test(timeout = 20000)
+ public void testTransactionalAcknowledgeAfterRecoveredWhileSendBlocked() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent");
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ // The initial send should occur in transaction #1
+ TransactionalStateMatcher transfer1StateMatcher = new TransactionalStateMatcher();
+ transfer1StateMatcher.withTxnId(equalTo(txnId1));
+ transfer1StateMatcher.withOutcome(nullValue());
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectCoordinatorAttach();
+ originalPeer.expectDeclare(txnId1);
+ originalPeer.expectSenderAttach();
+ // Send is synchronous so we don't respond in order to stall the MessageProducer
+ // in the send call to block recovery from initiating a new transaction which
+ // will then cause the current transaction to become in-doubt and commit should
+ // throw a transaction rolled back exception.
+ originalPeer.expectTransfer(messageMatcher, transfer1StateMatcher, false, false, null, false);
+ originalPeer.dropAfterLastHandler();
+
+ // --- Post Failover Expectations of sender --- //
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+ // Send will be blocked waiting to fire so it will not be filtered
+ // by the local transaction context in-doubt checks, however since there
+ // was pending transactional work the transaction will be rolled back.
+ // The AmqpFixedProducer should filter the send after reconnect as there
+ // won't be an active transaction coordinator until we start a new TXN.
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ finalPeer.expectCoordinatorAttach();
+ finalPeer.expectDeclare(txnId2);
+ finalPeer.expectDischarge(txnId2, true);
+ finalPeer.expectClose();
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.setForceSyncSend(true);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalPeer.getServerPort() == remoteURI.getPort()) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalPeer.getServerPort() == remoteURI.getPort()) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+
+ TextMessage message = session.createTextMessage(text);
+
+ try {
+ producer.send(message);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed the send after connection dropped.");
+ }
+
+ assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNotNull(consumer.receive(5000));
+
+ try {
+ session.commit();
+ fail("Transaction should have been rolled back");
+ } catch (TransactionRolledBackException txrbex) {
+ }
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Repeat(repetitions = 1)
+ @Test(timeout = 30_000)
+ public void testReceiveAndSendInTransactionFailsCommitWhenConnectionDropsDuringSend() throws Exception {
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+ final TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ // The initial send before failover should arrive in transaction #1
+ final TransactionalStateMatcher transfer1StateMatcher = new TransactionalStateMatcher();
+ transfer1StateMatcher.withTxnId(equalTo(txnId1));
+ transfer1StateMatcher.withOutcome(nullValue());
+
+ // Transactional Acknowledge should happen before the failover then no others should arrive.
+ final TransactionalStateMatcher dispositionStateMatcher = new TransactionalStateMatcher();
+ dispositionStateMatcher.withTxnId(equalTo(txnId1));
+ dispositionStateMatcher.withOutcome(new AcceptedMatcher());
+
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+ final CountDownLatch transactionRollback = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectCoordinatorAttach();
+ originalPeer.expectDeclare(txnId1);
+ originalPeer.expectReceiverAttach();
+ originalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+ originalPeer.expectDisposition(true, dispositionStateMatcher);
+ originalPeer.expectSenderAttach();
+ originalPeer.expectTransfer(messageMatcher, transfer1StateMatcher, false, false, null, false);
+ originalPeer.dropAfterLastHandler();
+
+ // Following failover the blocked send will be retried but the transaction will have
+ // been marked as in-dbout by the context and no new transactional work will be done
+ // until the commit is called and it throws a transaction rolled back error.
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlow();
+ finalPeer.expectCoordinatorAttach();
+ finalPeer.expectDeclare(txnId2);
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.setForceSyncSend(true);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalPeer.getServerPort() == remoteURI.getPort()) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalPeer.getServerPort() == remoteURI.getPort()) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ messageConsumer.setMessageListener((message) -> {
+ try {
+ session.createProducer(queue).send(session.createTextMessage("sample"));
+ session.commit();
+ } catch (TransactionRolledBackException txnRbEx) {
+ transactionRollback.countDown();
+ } catch (JMSException jmsEx) {
+ throw new RuntimeException("Behaving badly since commit already did", jmsEx);
+ }
+ });
+
+ assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ finalPeer.expectDischarge(txnId2, true);
+ finalPeer.expectClose();
+
+ assertTrue("Should have encounted a Transaction Rollback Error", transactionRollback.await(5, TimeUnit.SECONDS));
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testTransactionDeclareWithNoResponseRecoveredAsInDoubtAndCommitFails() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent");
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ // The initial send should occur in transaction #1
+ TransactionalStateMatcher transferStateMatcher = new TransactionalStateMatcher();
+ transferStateMatcher.withTxnId(equalTo(txnId1));
+ transferStateMatcher.withOutcome(nullValue());
+
+ // Accept the initial send after failover in transaction #1
+ TransactionalState transferTxnOutcome = new TransactionalState();
+ transferTxnOutcome.setTxnId(txnId1);
+ transferTxnOutcome.setOutcome(new Accepted());
+
+ // Receive call after failover should occur in transaction #1
+ TransactionalStateMatcher txnDispositionStateMatcher = new TransactionalStateMatcher();
+ txnDispositionStateMatcher.withTxnId(equalTo(txnId1));
+ txnDispositionStateMatcher.withOutcome(new AcceptedMatcher());
+
+ // Drop the connection after the declare giving a chance for recovery to attempt
+ // to rebuild while the context is still reacting to the failed begin.
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectCoordinatorAttach();
+ originalPeer.expectDeclareButDoNotRespond();
+ originalPeer.dropAfterLastHandler();
+
+ // --- Post Failover Expectations of sender --- //
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectCoordinatorAttach();
+ finalPeer.expectDeclare(txnId1);
+ finalPeer.expectSenderAttach();
+ // Send will be blocked waiting to fire so it will not be filtered
+ // by the local transaction context in-doubt checks, however since there
+ // was pending transactional work the transaction will be rolled back.
+ finalPeer.expectTransfer(messageMatcher, transferStateMatcher, transferTxnOutcome, true);
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ finalPeer.expectDisposition(true, txnDispositionStateMatcher);
+ finalPeer.expectDischarge(txnId1, false);
+ finalPeer.expectDeclare(txnId2);
+ finalPeer.expectDischarge(txnId2, true);
+ finalPeer.expectClose();
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.setForceSyncSend(true);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalPeer.getServerPort() == remoteURI.getPort()) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalPeer.getServerPort() == remoteURI.getPort()) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("myMessage");
+
+ try {
+ producer.send(message);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed the async completion send.");
+ }
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNotNull(consumer.receive(5000));
+
+ try {
+ session.commit();
+ } catch (TransactionRolledBackException txrbex) {
+ fail("Transaction should not have been rolled back");
+ }
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testTransactionCommitWithNoResponseRecoveredAsInDoubtAndPerformsNoWork() throws Exception {
+ doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(true);
+ }
+
+ @Test(timeout = 20000)
+ public void testTransactionRollbackWithNoResponseRecoveredAsInDoubtAndPerformsNoWork() throws Exception {
+ doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(false);
+ }
+
+ private void doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(boolean commit) throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent");
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ // The replayed send should occur in transaction #2
+ TransactionalStateMatcher transferStateMatcher = new TransactionalStateMatcher();
+ transferStateMatcher.withTxnId(equalTo(txnId2));
+ transferStateMatcher.withOutcome(nullValue());
+
+ // Receive call after failover should occur in transaction #2
+ TransactionalStateMatcher txnDispositionStateMatcher = new TransactionalStateMatcher();
+ txnDispositionStateMatcher.withTxnId(equalTo(txnId2));
+ txnDispositionStateMatcher.withOutcome(new AcceptedMatcher());
+
+ // Re-send after failover should occur in TXN #2
+ TransactionalState transferTxnOutcome = new TransactionalState();
+ transferTxnOutcome.setTxnId(txnId2);
+ transferTxnOutcome.setOutcome(new Accepted());
+
+ // Drop the connection after the declare giving a chance for recovery to attempt
+ // to rebuild while the context is still reacting to the failed begin.
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectCoordinatorAttach();
+ originalPeer.expectDeclare(txnId1);
+ originalPeer.expectDischargeButDoNotRespond(txnId1, !commit);
+ originalPeer.expectDeclareButDoNotRespond();
+ originalPeer.dropAfterLastHandler();
+
+ // --- Post Failover Expectations of sender --- //
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectCoordinatorAttach();
+ finalPeer.expectDeclare(txnId2);
+ finalPeer.expectSenderAttach();
+ // Send will be blocked waiting to fire so it will not be filtered
+ // by the local transaction context in-doubt checks, however since there
+ // was pending transactional work the transaction will be rolled back.
+ finalPeer.expectTransfer(messageMatcher, transferStateMatcher, transferTxnOutcome, true);
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ finalPeer.expectDisposition(true, txnDispositionStateMatcher);
+ finalPeer.expectDischarge(txnId2, false);
+ finalPeer.expectDeclare(txnId1);
+ finalPeer.expectDischarge(txnId1, true);
+ finalPeer.expectClose();
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalPeer.getServerPort() == remoteURI.getPort()) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalPeer.getServerPort() == remoteURI.getPort()) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ if (commit) {
+ try {
+ session.commit();
+ fail("Should have failed on the commit when connection dropped");
+ } catch (TransactionRolledBackException txnRbEx) {
+ // Expected
+ }
+ } else {
+ try {
+ session.rollback();
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed on the rollback when connection dropped");
+ }
+ }
+
+ assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("myMessage");
+
+ try {
+ producer.send(message);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed the async completion send.");
+ }
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNotNull(consumer.receive(5000));
+
+ try {
+ session.commit();
+ } catch (TransactionRolledBackException txrbex) {
+ fail("Transaction should not have been rolled back");
+ }
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Repeat(repetitions = 1)
+ @Test(timeout = 20_000)
+ public void testSendWhileOfflinePreventsRecoveredTransactionFromCommitting() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+ // Drop the connection after the declare giving a chance for recovery to attempt
+ // to rebuild while the context is still reacting to the failed begin.
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectCoordinatorAttach();
+ originalPeer.expectDeclare(txnId1);
+ originalPeer.expectSenderAttach();
+ originalPeer.expectDischargeButDoNotRespond(txnId1, false);
+ originalPeer.expectDeclareButDoNotRespond();
+ originalPeer.dropAfterLastHandler();
+
+ // --- Post Failover Expectations of sender --- //
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectCoordinatorAttach();
+ finalPeer.expectDeclare(txnId2);
+ finalPeer.expectSenderAttach();
+ finalPeer.expectDischarge(txnId2, true);
+ finalPeer.expectDeclare(txnId1);
+ finalPeer.expectDischarge(txnId1, true);
+ finalPeer.expectClose();
+
+ // Need to allow time for the asynchronous send to fire after connection drop in order to ensure
+ // that the send is no-op'd when the TXN is in-doubt
+ final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=1000", originalPeer, finalPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalPeer.getServerPort() == remoteURI.getPort()) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalPeer.getServerPort() == remoteURI.getPort()) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("myMessage");
+
+ try {
+ session.commit();
+ fail("Should have failed on the commit when connection dropped");
+ } catch (TransactionRolledBackException txnRbEx) {
+ // Expected
+ }
+
+ // Following the failed commit the Transaction should be in-doubt and the send
+ // should be skipped since the TXN is in-doubt because we should not have connected
+ // to the final peer yet so a recovery wouldn't have happened and the transaction
+ // state couldn't be marked good since currently there should be no active transaction.
+
+ try {
+ producer.send(message);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed the async completion send.");
+ }
+
+ assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+ try {
+ session.commit();
+ fail("Transaction should have been rolled back since a send was skipped.");
+ } catch (TransactionRolledBackException txrbex) {
+ // Expected
+ }
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Repeat(repetitions = 1)
+ @Test(timeout = 20000)
public void testFailoverDoesNotFailPendingAsyncCompletionSend() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {