blob: 5eeecdb7b5915263e7c3b5e27c0a9f8f1da61fe8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
import org.apache.qpid.jms.test.testpeer.describedtypes.Error;
import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ErrorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.jms.util.QpidJMSTestRunner;
import org.apache.qpid.jms.util.Repeat;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for behavior of Transacted Session operations.
*/
@RunWith(QpidJMSTestRunner.class)
public class TransactionsIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(TransactionsIntegrationTest.class);
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout=20000)
public void testTransactionRolledBackOnSessionClose() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
// Closed session should roll-back the TX with a failed discharge
testPeer.expectDischarge(txnId, true);
testPeer.expectEnd();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
session.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testTransactionCommitFailWithEmptyRejectedDisposition() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId1);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId1));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId1);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
producer.send(session.createMessage());
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the commit failed
testPeer.expectDischarge(txnId1, false, new Rejected());
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId2);
try {
session.commit();
fail("Commit operation should have failed.");
} catch (TransactionRolledBackException jmsTxRb) {
}
// session should roll back on close
testPeer.expectDischarge(txnId2, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testProducedMessagesAfterCommitOfSentMessagesFails() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId1);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId1));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId1);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
producer.send(session.createMessage());
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the commit failed
Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
testPeer.expectDischarge(txnId1, false, commitFailure);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId2);
try {
session.commit();
fail("Commit operation should have failed.");
} catch (TransactionRolledBackException jmsTxRb) {
}
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId2));
stateMatcher.withOutcome(nullValue());
txState = new TransactionalState();
txState.setTxnId(txnId2);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
testPeer.expectDischarge(txnId2, true);
producer.send(session.createMessage());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testProducedMessagesAfterRollbackSentMessagesFails() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId1));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId1);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
producer.send(session.createMessage());
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the rollback failed
Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
testPeer.expectDischarge(txnId1, true, commitFailure);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
testPeer.expectDeclare(txnId2);
try {
session.rollback();
fail("Rollback operation should have failed.");
} catch (JMSException jmsex) {
}
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId2));
stateMatcher.withOutcome(nullValue());
txState = new TransactionalState();
txState.setTxnId(txnId2);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
testPeer.expectDischarge(txnId2, true);
producer.send(session.createMessage());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
doCommitTransactedSessionWithConsumerTestImpl(1, 1, false, false);
}
@Test(timeout=20000)
public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseBefore() throws Exception {
doCommitTransactedSessionWithConsumerTestImpl(1, 1, true, true);
}
@Test(timeout=20000)
public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseAfter() throws Exception {
doCommitTransactedSessionWithConsumerTestImpl(1, 1, true, false);
}
@Test(timeout=20000)
public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
doCommitTransactedSessionWithConsumerTestImpl(5, 2, false, false);
}
@Test(timeout=20000)
public void testCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesBefore() throws Exception {
doCommitTransactedSessionWithConsumerTestImpl(5, 2, true, true);
}
@Test(timeout=20000)
public void testCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesAfter() throws Exception {
doCommitTransactedSessionWithConsumerTestImpl(5, 2, true, false);
}
private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer, boolean closeBeforeCommit) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int DEFAULT_PREFETCH = 100;
// Set to fixed known value to reduce breakage if defaults are changed.
Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount);
for (int i = 1; i <= consumeCount; i++) {
// Then expect an *settled* TransactionalState disposition for each message once received by the consumer
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(new AcceptedMatcher());
testPeer.expectDisposition(true, stateMatcher);
}
final CountDownLatch expected = new CountDownLatch(transferCount);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
expected.countDown();
}
});
MessageConsumer messageConsumer = session.createConsumer(queue);
for (int i = 1; i <= consumeCount; i++) {
Message receivedMessage = messageConsumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
}
// Ensure all the messages arrived so that the matching below is deterministic
assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(10, TimeUnit.SECONDS));
// Expect the consumer to close now
if (closeConsumer && closeBeforeCommit) {
// Expect the client to then drain off all credit from the link.
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - transferCount)));
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
for (int i = 1; i <= unconsumed; i++) {
testPeer.expectDispositionThatIsReleasedAndSettled();
}
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.expectDischarge(txnId, false);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
// Now the deferred close should be performed.
testPeer.expectDetach(true, true, true);
messageConsumer.close();
} else {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.expectDischarge(txnId, false);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
}
session.commit();
if (closeConsumer && !closeBeforeCommit) {
testPeer.expectDetach(true, true, true);
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
for (int i = 1; i <= unconsumed; i++) {
testPeer.expectDispositionThatIsReleasedAndSettled();
}
messageConsumer.close();
}
testPeer.expectDischarge(txnId, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testConsumerWithNoMessageCanCloseBeforeCommit() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
testPeer.expectDetach(true, true, true);
MessageConsumer messageConsumer = session.createConsumer(queue);
assertNull(messageConsumer.receiveNoWait());
messageConsumer.close();
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.expectDischarge(txnId, false);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
testPeer.expectDischarge(txnId, true);
session.commit();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testConsumerWithNoMessageCanCloseBeforeRollback() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
testPeer.expectDetach(true, true, true);
MessageConsumer messageConsumer = session.createConsumer(queue);
assertNull(messageConsumer.receiveNoWait());
messageConsumer.close();
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.expectDischarge(txnId, true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
testPeer.expectDischarge(txnId, true);
session.rollback();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testProducedMessagesOnTransactedSessionCarryTxnId() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
testPeer.expectDischarge(txnId, true);
producer.send(session.createMessage());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testProducedMessagesOnTransactedSessionCanBeReused() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
Message message = session.createMessage();
for(int i = 0; i < 3; ++i) {
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
message.setIntProperty("sequence", i);
producer.send(message);
}
// Expect rollback on close without a commit call.
testPeer.expectDischarge(txnId, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
doRollbackTransactedSessionWithConsumerTestImpl(1, 1, false);
}
@Test(timeout=20000)
public void testRollbackTransactedSessionWithConsumerReceivingAllMessagesThenCloses() throws Exception {
doRollbackTransactedSessionWithConsumerTestImpl(1, 1, true);
}
@Test(timeout=20000)
public void testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
doRollbackTransactedSessionWithConsumerTestImpl(5, 2, false);
}
@Test(timeout=20000)
public void testRollbackTransactedSessionWithConsumerReceivingSomeMessagesThenCloses() throws Exception {
doRollbackTransactedSessionWithConsumerTestImpl(5, 2, true);
}
private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Set to fixed known value to reduce breakage if defaults are changed.
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount);
for (int i = 1; i <= consumeCount; i++) {
// Then expect a *settled* TransactionalState disposition for each message once received by the consumer
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(new AcceptedMatcher());
testPeer.expectDisposition(true, stateMatcher);
}
MessageConsumer messageConsumer = session.createConsumer(queue);
for (int i = 1; i <= consumeCount; i++) {
Message receivedMessage = messageConsumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
}
// Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
testPeer.expectLinkFlow(true, true, greaterThan(UnsignedInteger.ZERO));
if (closeConsumer) {
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
for (int i = 1; i <= unconsumed; i++) {
testPeer.expectDispositionThatIsReleasedAndSettled();
}
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.expectDischarge(txnId, true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
// Now the deferred close should be performed.
testPeer.expectDetach(true, true, true);
messageConsumer.close();
} else {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.expectDischarge(txnId, true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
for (int i = 1; i <= unconsumed; i++) {
testPeer.expectDisposition(true, new ReleasedMatcher());
}
// Expect the consumer to be 'started' again as rollback completes
testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO));
}
testPeer.expectDischarge(txnId, true);
session.rollback();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int messageCount = 5;
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + messageCount);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a consumer and fill the prefetch with messages, which we wont consume any of
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
session.createConsumer(queue);
// Create a producer
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
producer.send(session.createMessage());
// The consumer will be 'stopped' prior to rollback, however we will NOT send a 'drain' Flow
// frame as we have manipulated that all the credit was already used, i.e. it already stopped.
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.expectDischarge(txnId, true);
// Now expect an unsettled 'declare' transfer to the txn coordinator for the next txn, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
session.rollback();
testPeer.expectDischarge(txnId, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int messageCount = 5;
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + messageCount);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a consumer, expect it to flow credit, but don't send it any messages
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
session.createConsumer(queue);
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which provoked creating the transaction
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
producer.send(session.createMessage());
// Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' Flow.
// Action the drain by filling the prefetch (which is equivalent to this having happened while
// the Flow was in flight to the peer), and then DONT send a flow frame back to the client
// as it can tell from the messages that all the credit has been used.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, false);
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.expectDischarge(txnId, true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
session.rollback();
testPeer.expectDischarge(txnId, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withAddress(equalTo(queueName));
sourceMatcher.withDynamic(equalTo(false));
sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL));
ModifiedMatcher outcomeMatcher = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(nullValue());
sourceMatcher.withDefaultOutcome(outcomeMatcher);
testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
testPeer.expectLinkFlow();
testPeer.expectDischarge(txnId, true);
session.createConsumer(queue);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testCoordinatorLinkSupportedOutcomes() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
// Expect session, then coordinator link
testPeer.expectBegin();
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL));
testPeer.expectCoordinatorAttach(sourceMatcher);
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
connection.createSession(true, Session.SESSION_TRANSACTED);
//Expect rollback on close
testPeer.expectDischarge(txnId, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testRollbackErrorCoordinatorClosedOnCommit() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId1, false, true, txnId2);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId2);
testPeer.expectDischarge(txnId2, true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.commit();
fail("Transaction should have rolled back");
} catch (TransactionRolledBackException ex) {
LOG.info("Caught expected TransactionRolledBackException");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testRollbackErrorWhenCoordinatorRemotelyClosed() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
testPeer.remotelyCloseLastCoordinatorLink();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
testPeer.expectDischarge(txnId, true);
try {
session.commit();
fail("Transaction should have rolled back");
} catch (TransactionRolledBackException ex) {
LOG.info("Caught expected TransactionRolledBackException");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testJMSErrorCoordinatorClosedOnRollback() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId1, true, true, txnId2);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId2);
testPeer.expectDischarge(txnId2, true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.rollback();
fail("Transaction should have rolled back");
} catch (JMSException ex) {
LOG.info("Caught expected JMSException");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testJMSExceptionOnRollbackWhenCoordinatorRemotelyClosed() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
testPeer.remotelyCloseLastCoordinatorLink();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
testPeer.expectDischarge(txnId, true);
try {
session.rollback();
fail("Rollback should have thrown a JMSException");
} catch (JMSException ex) {
LOG.info("Caught expected JMSException");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testSendAfterCoordinatorLinkClosedDuringTX() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
// Close the link, the messages should now just get dropped on the floor.
testPeer.remotelyCloseLastCoordinatorLink();
MessageProducer producer = session.createProducer(queue);
testPeer.waitForAllHandlersToComplete(2000);
producer.send(session.createMessage());
// Expect that a new link will be created in order to start the next TX.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
// Expect that the session TX will rollback on close.
testPeer.expectDischarge(txnId, true);
try {
session.commit();
fail("Commit operation should have failed.");
} catch (TransactionRolledBackException jmsTxRb) {
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testReceiveAfterCoordinatorLinkClosedDuringTX() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a consumer and send it an initial message for receive to process.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
// Close the link, the messages should now just get dropped on the floor.
testPeer.remotelyCloseLastCoordinatorLink();
MessageConsumer consumer = session.createConsumer(queue);
testPeer.waitForAllHandlersToComplete(2000);
// receiving the message would normally ack it, since the TX is failed this
// should not result in a disposition going out.
Message received = consumer.receive();
assertNotNull(received);
// Expect that a new link will be created in order to start the next TX.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
// Expect that the session TX will rollback on close.
testPeer.expectDischarge(txnId, true);
try {
session.commit();
fail("Commit operation should have failed.");
} catch (TransactionRolledBackException jmsTxRb) {
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testSessionCreateFailsOnDeclareTimeout() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setRequestTimeout(500);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
testPeer.expectDeclareButDoNotRespond();
// Expect the AMQP session to be closed due to the JMS session creation failure.
testPeer.expectEnd();
try {
connection.createSession(true, Session.SESSION_TRANSACTED);
fail("Should have timed out waiting for declare.");
} catch (JmsOperationTimedOutException jmsEx) {
} catch (Throwable error) {
fail("Should have caught an timed out exception:");
LOG.error("Caught -> ", error);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testSessionCreateFailsOnDeclareRejection() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Rejected disposition state to indicate failure.
testPeer.expectDeclareAndReject();
// Expect the AMQP session to be closed due to the JMS session creation failure.
testPeer.expectEnd();
try {
connection.createSession(true, Session.SESSION_TRANSACTED);
fail("should have thrown");
} catch (JMSException jmse) {
// Expected
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testSessionCreateFailsOnCoordinatorLinkRefusal() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
// Expect coordinator link, refuse it, expect detach reply
String errorMessage = "CoordinatorLinkRefusal-breadcrumb";
testPeer.expectCoordinatorAttach(true, false, AmqpError.NOT_IMPLEMENTED, errorMessage);
testPeer.expectDetach(true, false, false);
// Expect the AMQP session to be closed due to the JMS session creation failure.
testPeer.expectEnd();
try {
connection.createSession(true, Session.SESSION_TRANSACTED);
fail("should have thrown");
} catch (JMSException jmse) {
assertNotNull(jmse.getMessage());
assertTrue("Expected exception message to contain breadcrumb", jmse.getMessage().contains(errorMessage));
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testTransactionRolledBackOnSessionCloseTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setRequestTimeout(500);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
// Closed session should roll-back the TX with a failed discharge
testPeer.expectDischargeButDoNotRespond(txnId, true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.close();
fail("Should have timed out waiting for declare.");
} catch (JmsOperationTimedOutException jmsEx) {
} catch (Throwable error) {
fail("Should have caught an timed out exception:");
LOG.error("Caught -> ", error);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testTransactionRolledBackTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setRequestTimeout(500);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
// Expect discharge but don't respond so that the request timeout kicks in and fails
// the discharge. The pipelined declare should arrive as well and be discharged as the
// client attempts to recover to a known good state.
testPeer.expectDischargeButDoNotRespond(txnId1, true);
// Session should throw from the rollback and then try and recover.
testPeer.expectDeclare(txnId2);
testPeer.expectDischarge(txnId2, true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.rollback();
fail("Should have timed out waiting for declare.");
} catch (JmsOperationTimedOutException jmsEx) {
} catch (Throwable error) {
fail("Should have caught an timed out exception:");
LOG.error("Caught -> ", error);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testTransactionCommitTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setRequestTimeout(500);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
// Expect discharge but don't respond so that the request timeout kicks in and fails
// the discharge. The pipelined declare should arrive as well and we respond with
// successful declare.
testPeer.expectDischargeButDoNotRespond(txnId1, false);
testPeer.expectDeclare(txnId2);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.commit();
fail("Should have timed out waiting for declare.");
} catch (JmsOperationTimedOutException jmsEx) {
} catch (Throwable error) {
fail("Should have caught an timed out exception:");
LOG.error("Caught -> ", error);
}
// Session rolls back on close
testPeer.expectDischarge(txnId2, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testTransactionCommitTimesOutAndNoNextBeginTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setRequestTimeout(500);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
// Expect discharge and don't respond so that the request timeout kicks in
// Expect pipelined declare and don't response so that the request timeout kicks in.
// The commit operation should throw a timed out exception at that point.
testPeer.expectDischargeButDoNotRespond(txnId1, false);
testPeer.expectDeclareButDoNotRespond();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// After the pipelined operations both time out, the session should attempt to
// recover by creating a new TX, then on close the session should roll it back
testPeer.expectDeclare(txnId2);
testPeer.expectDischarge(txnId2, true);
try {
session.commit();
fail("Should have timed out waiting for declare.");
} catch (JmsOperationTimedOutException jmsEx) {
} catch (Throwable error) {
fail("Should have caught an timed out exception:");
LOG.error("Caught -> ", error);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testRollbackWithNoResponseForSuspendConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=1000");
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
// Then expect a *settled* TransactionalState disposition for the message once received by the consumer
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(new AcceptedMatcher());
testPeer.expectDisposition(true, stateMatcher);
// Read one so we try to suspend on rollback
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
// Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
testPeer.expectLinkFlow(true, false, greaterThan(UnsignedInteger.ZERO));
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.expectDischarge(txnId, true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
testPeer.expectDischarge(txnId, true);
try {
session.rollback();
fail("Should throw a timed out exception");
} catch (JmsOperationTimedOutException jmsEx) {}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=30000)
@Repeat(repetitions = 1)
public void testConsumerMessageOrderOnTransactedSession() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
final int messageCount = 10;
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
testPeer.expectTransfer(declareMatcher, nullValue(), new Declared().setTxnId(txnId), true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
// Expect the browser enumeration to create a underlying consumer
testPeer.expectReceiverAttach();
// Expect initial credit to be sent, respond with some messages that are tagged with
// a sequence number we can use to determine if order is maintained.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
messageCount, false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true);
for (int i = 1; i <= messageCount; i++) {
// Then expect an *settled* TransactionalState disposition for each message once received by the consumer
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(new AcceptedMatcher());
testPeer.expectDisposition(true, stateMatcher);
}
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < messageCount; ++i) {
Message message = consumer.receive(500);
assertNotNull(message);
assertEquals(i, message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER));
}
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.expectDischarge(txnId, true);
testPeer.expectEnd();
session.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=60000)
public void testConsumeManyWithSingleTXPerMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
final int messageCount = 10;
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Deque<Binary> txnIdDeque = new ArrayDeque<>(3);
txnIdDeque.offer(new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}));
txnIdDeque.offer(new Binary(new byte[]{ (byte) 2, (byte) 4, (byte) 6, (byte) 8}));
txnIdDeque.offer(new Binary(new byte[]{ (byte) 5, (byte) 4, (byte) 3, (byte) 2}));
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = txnIdDeque.removeFirst();
txnIdDeque.addLast(txnId);
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
// Expect the browser enumeration to create a underlying consumer
testPeer.expectReceiverAttach();
// Expect initial credit to be sent, respond with some messages that are tagged with
// a sequence number we can use to determine if order is maintained.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
messageCount, false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < messageCount; i++) {
// Then expect an *settled* TransactionalState disposition for each message once received by the consumer
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(new AcceptedMatcher());
testPeer.expectDisposition(true, stateMatcher);
Message message = consumer.receive(500);
assertNotNull(message);
assertEquals(i, message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER));
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.expectDischarge(txnId, false);
// Expect the next transaction to start.
txnId = txnIdDeque.removeFirst();
txnIdDeque.addLast(txnId);
testPeer.expectDeclare(txnId);
session.commit();
}
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.expectDischarge(txnId, true);
testPeer.expectEnd();
session.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testTransactionDeclaredDispositionWithoutTxnId() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
AtomicReference<JMSException> failure = new AtomicReference<>();
CountDownLatch exceptionListenerFired = new CountDownLatch(1);
connection.setExceptionListener(jmse -> {
failure.compareAndSet(null, jmse);
exceptionListenerFired.countDown();
});
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// Expect declare, reply declared but without a txn-id, which is illegal.
testPeer.expectDeclare(null);
ErrorMatcher errorMatcher = new ErrorMatcher()
.withCondition(equalTo(AmqpError.DECODE_ERROR))
.withDescription(equalTo("The txn-id field cannot be omitted"));
testPeer.expectClose(errorMatcher, false);
testPeer.setSuppressReadExceptionOnClose(true);
try {
connection.createSession(true, Session.SESSION_TRANSACTED);
fail("expected exception to be thrown");
} catch (JMSException e) {
// Expected
}
assertTrue("The ExceptionListener should have been alerted", exceptionListenerFired.await(3, TimeUnit.SECONDS));
JMSException ex = failure.get();
assertTrue("Unexpected exception type: " + ex, ex instanceof JmsConnectionFailedException);
MatcherAssert.assertThat("Unexpected exception type: ", ex.getMessage(),
equalTo("The JMS connection has failed: Error in proton Transport: The txn-id field cannot be omitted [condition = amqp:decode-error]"));
testPeer.waitForAllHandlersToComplete(1000);
connection.close(); // Already nuked under the covers due to txn-id being missing
}
}
}