blob: 9242c584848f946d98a131ff0290d83976191f5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Transactions;
using Amqp;
using Amqp.Framing;
using Amqp.Transactions;
using Apache.NMS;
using Apache.NMS.AMQP.Util;
using NMS.AMQP.Test.TestAmqp;
using NUnit.Framework;
using IConnection = Apache.NMS.IConnection;
using ISession = Apache.NMS.ISession;
namespace NMS.AMQP.Test.Integration
{
[TestFixture]
public class TransactionsIntegrationTest : IntegrationTestFixture
{
[Test, Timeout(20_000)]
public void TestTransactionRolledBackOnSessionClose()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId);
// Closed session should roll-back the TX with a failed discharge
testPeer.ExpectDischarge(txnId, true);
testPeer.ExpectEnd();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
session.Close();
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestTransactionCommitFailWithEmptyRejectedDisposition()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
byte[] txnId1 = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId1);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.ExpectSenderAttach();
IMessageProducer producer = session.CreateProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
Action<DeliveryState> stateMatcher = state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
var transactionalState = (TransactionalState) state;
CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
Assert.IsNull(transactionalState.Outcome);
};
testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
{
Outcome = new Accepted(),
TxnId = txnId1
}, responseSettled: true);
producer.Send(session.CreateMessage());
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the commit failed
testPeer.ExpectDischarge(txnId1, dischargeState: false, responseState: new Rejected());
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId2 = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId2);
Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
// session should roll back on close
testPeer.ExpectDischarge(txnId2, true);
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestProducedMessagesAfterCommitOfSentMessagesFails()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
byte[] txnId1 = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId1);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.ExpectSenderAttach();
IMessageProducer producer = session.CreateProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
Action<DeliveryState> stateMatcher = state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
var transactionalState = (TransactionalState) state;
CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
Assert.IsNull(transactionalState.Outcome);
};
testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
{
Outcome = new Accepted(),
TxnId = txnId1
}, responseSettled: true);
producer.Send(session.CreateMessage());
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the commit failed
testPeer.ExpectDischarge(txnId1, false, new Rejected() { Error = new Error(ErrorCode.InternalError) { Description = "Unknown error" } });
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId2 = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId2);
Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
stateMatcher = state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
var transactionalState = (TransactionalState) state;
CollectionAssert.AreEqual(txnId2, transactionalState.TxnId);
Assert.IsNull(transactionalState.Outcome);
};
testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
{
Outcome = new Accepted(),
TxnId = txnId2
}, responseSettled: true);
testPeer.ExpectDischarge(txnId2, dischargeState: true);
producer.Send(session.CreateMessage());
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestProducedMessagesAfterRollbackSentMessagesFails()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
byte[] txnId1 = { 5, 6, 7, 8 };
byte[] txnId2 = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId1);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.ExpectSenderAttach();
IMessageProducer producer = session.CreateProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
Action<DeliveryState> stateMatcher = state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
var transactionalState = (TransactionalState) state;
CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
Assert.IsNull(transactionalState.Outcome);
};
testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
{
Outcome = new Accepted(),
TxnId = txnId1
}, responseSettled: true);
producer.Send(session.CreateMessage());
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the rollback failed
testPeer.ExpectDischarge(txnId1, true, new Rejected() { Error = new Error(ErrorCode.InternalError) { Description = "Unknown error" } });
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
testPeer.ExpectDeclare(txnId2);
Assert.Catch<TransactionRolledBackException>(() => session.Rollback(), "Rollback operation should have failed.");
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
stateMatcher = state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
var transactionalState = (TransactionalState) state;
CollectionAssert.AreEqual(txnId2, transactionalState.TxnId);
Assert.IsNull(transactionalState.Outcome);
};
testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
{
Outcome = new Accepted(),
TxnId = txnId2
}, responseSettled: true);
testPeer.ExpectDischarge(txnId2, dischargeState: true);
producer.Send(session.CreateMessage());
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestCommitTransactedSessionWithConsumerReceivingAllMessages()
{
DoCommitTransactedSessionWithConsumerTestImpl(1, 1, false, false);
}
[Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
public void TestCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseBefore()
{
DoCommitTransactedSessionWithConsumerTestImpl(1, 1, true, true);
}
[Test, Timeout(20_000)]
public void TestCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseAfter()
{
DoCommitTransactedSessionWithConsumerTestImpl(1, 1, true, false);
}
[Test, Timeout(20_000)]
public void TestCommitTransactedSessionWithConsumerReceivingSomeMessages()
{
DoCommitTransactedSessionWithConsumerTestImpl(5, 2, false, false);
}
[Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
public void TestCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesBefore()
{
DoCommitTransactedSessionWithConsumerTestImpl(5, 2, true, true);
}
[Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
public void TestCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesAfter()
{
DoCommitTransactedSessionWithConsumerTestImpl(5, 2, true, false);
}
private void DoCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, bool closeConsumer, bool closeBeforeCommit)
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), transferCount);
for (int i = 1; i <= consumeCount; i++)
{
// Then expect a *settled* TransactionalState disposition for each message once received by the consumer
testPeer.ExpectDisposition(settled: true, stateMatcher: state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
var transactionalState = (TransactionalState) state;
Assert.AreEqual(txnId, transactionalState.TxnId);
Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
});
}
IMessageConsumer messageConsumer = session.CreateConsumer(queue);
for (int i = 1; i <= consumeCount; i++)
{
IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
Assert.NotNull(receivedMessage);
Assert.IsInstanceOf<ITextMessage>(receivedMessage);
}
// Expect the consumer to close now
if (closeConsumer && closeBeforeCommit)
{
// Expect the client to then drain off all credit from the link.
testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
for (int i = 1; i <= unconsumed; i++)
{
testPeer.ExpectDispositionThatIsReleasedAndSettled();
}
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.ExpectDischarge(txnId, dischargeState: false);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
testPeer.ExpectDeclare(txnId);
// Now the deferred close should be performed.
testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
messageConsumer.Close();
}
else
{
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.ExpectDischarge(txnId, dischargeState: false);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
testPeer.ExpectDeclare(txnId);
}
session.Commit();
if (closeConsumer && !closeBeforeCommit)
{
testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
for (int i = 1; i <= unconsumed; i++)
{
testPeer.ExpectDispositionThatIsReleasedAndSettled();
}
messageConsumer.Close();
}
testPeer.ExpectDischarge(txnId, dischargeState: true);
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestConsumerWithNoMessageCanCloseBeforeCommit()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlow();
// TODO: qpid-jms extend 2 additional flow links
// 1) Drain related with deferred consumer close, this feature is currently
// not implemented.
// 2) Consumer pull - not implemented
// testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
// testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false);
testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
IMessageConsumer messageConsumer = session.CreateConsumer(queue);
Assert.IsNull(messageConsumer.ReceiveNoWait());
messageConsumer.Close();
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.ExpectDischarge(txnId, dischargeState: false);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
testPeer.ExpectDeclare(txnId);
testPeer.ExpectDischarge(txnId, dischargeState: true);
session.Commit();
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestConsumerWithNoMessageCanCloseBeforeRollback()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlow();
// TODO: qpid-jms extend 2 additional flow links
// 1) Drain related with deferred consumer close, this feature is currently
// not implemented.
// 2) Consumer pull - not implemented
// testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
// testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false);
testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
IMessageConsumer messageConsumer = session.CreateConsumer(queue);
Assert.IsNull(messageConsumer.ReceiveNoWait());
messageConsumer.Close();
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.ExpectDischarge(txnId, dischargeState: true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
testPeer.ExpectDeclare(txnId);
testPeer.ExpectDischarge(txnId, dischargeState: true);
session.Rollback();
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestProducedMessagesOnTransactedSessionCarryTxnId()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.ExpectSenderAttach();
IMessageProducer producer = session.CreateProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
testPeer.ExpectTransfer(messageMatcher: Assert.NotNull,
stateMatcher: state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
TransactionalState transactionalState = (TransactionalState) state;
CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
Assert.IsNull(transactionalState.Outcome);
},
responseState: new TransactionalState() { TxnId = txnId, Outcome = new Accepted() },
responseSettled: true);
testPeer.ExpectDischarge(txnId, dischargeState: true);
producer.Send(session.CreateMessage());
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestProducedMessagesOnTransactedSessionCanBeReused()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.ExpectSenderAttach();
IMessageProducer producer = session.CreateProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
IMessage message = session.CreateMessage();
for (int i = 0; i < 3; i++)
{
testPeer.ExpectTransfer(messageMatcher: Assert.NotNull,
stateMatcher: state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
TransactionalState transactionalState = (TransactionalState) state;
CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
Assert.IsNull(transactionalState.Outcome);
},
responseState: new TransactionalState() { TxnId = txnId, Outcome = new Accepted() },
responseSettled: true);
message.Properties.SetInt("sequence", i);
producer.Send(message);
}
// Expect rollback on close without a commit call.
testPeer.ExpectDischarge(txnId, dischargeState: true);
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestRollbackTransactedSessionWithConsumerReceivingAllMessages()
{
DoRollbackTransactedSessionWithConsumerTestImpl(1, 1, false);
}
[Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
public void TestRollbackTransactedSessionWithConsumerReceivingAllMessagesThenCloses()
{
DoRollbackTransactedSessionWithConsumerTestImpl(1, 1, true);
}
[Test, Timeout(20_000), Ignore("TODO: Fix")]
public void TestRollbackTransactedSessionWithConsumerReceivingSomeMessages()
{
DoRollbackTransactedSessionWithConsumerTestImpl(5, 2, false);
}
[Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
public void TestRollbackTransactedSessionWithConsumerReceivingSomeMessagesThenCloses()
{
DoRollbackTransactedSessionWithConsumerTestImpl(5, 2, true);
}
private void DoRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, bool closeConsumer)
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), transferCount);
for (int i = 1; i <= consumeCount; i++)
{
// Then expect a *settled* TransactionalState disposition for each message once received by the consumer
testPeer.ExpectDisposition(settled: true, stateMatcher: state =>
{
Assert.IsInstanceOf<TransactionalState>(state);
var transactionalState = (TransactionalState) state;
Assert.AreEqual(txnId, transactionalState.TxnId);
Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
});
}
IMessageConsumer messageConsumer = session.CreateConsumer(queue);
for (int i = 1; i <= consumeCount; i++)
{
IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
Assert.IsNotNull(receivedMessage);
Assert.IsInstanceOf<ITextMessage>(receivedMessage);
}
// Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true, creditMatcher: c => Assert.AreEqual(0, c));
if (closeConsumer)
{
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
for (int i = 1; i <= unconsumed; i++)
{
testPeer.ExpectDispositionThatIsReleasedAndSettled();
}
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.ExpectDischarge(txnId, dischargeState: false);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
testPeer.ExpectDeclare(txnId);
// Now the deferred close should be performed.
testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
messageConsumer.Close();
}
else
{
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.ExpectDischarge(txnId, dischargeState: true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
testPeer.ExpectDeclare(txnId);
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
for (int i = 1; i <= unconsumed; i++)
{
testPeer.ExpectDispositionThatIsReleasedAndSettled();
}
// Expect the consumer to be 'started' again as rollback completes
testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false, creditMatcher: c => Assert.Greater(c, 0));
}
testPeer.ExpectDischarge(txnId, dischargeState: true);
session.Rollback();
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
// TODO:
// TestRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer
// TestRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer
[Test, Timeout(20_000)]
public void TestDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
string queueName = "myQueue";
IQueue queue = session.GetQueue(queueName);
testPeer.ExpectReceiverAttach(linkNameMatcher: Assert.IsNotNull, targetMatcher: Assert.IsNotNull, sourceMatcher: source =>
{
Assert.AreEqual(queueName, source.Address);
Assert.IsFalse(source.Dynamic);
CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_ACCEPTED);
CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_REJECTED);
CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_RELEASED);
CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_MODIFIED);
Assert.IsInstanceOf<Modified>(source.DefaultOutcome);
Modified modified = (Modified) source.DefaultOutcome;
Assert.IsTrue(modified.DeliveryFailed);
Assert.IsFalse(modified.UndeliverableHere);
});
testPeer.ExpectLinkFlow();
testPeer.ExpectDischarge(txnId, dischargeState: true);
session.CreateConsumer(queue);
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestCoordinatorLinkSupportedOutcomes()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach(sourceMatcher: s =>
{
Source source = (Source) s;
CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_ACCEPTED);
CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_REJECTED);
CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_RELEASED);
CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_MODIFIED);
});
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId);
connection.CreateSession(AcknowledgementMode.Transactional);
//Expect rollback on close
testPeer.ExpectDischarge(txnId, dischargeState: true);
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestRollbackErrorCoordinatorClosedOnCommit()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId1 = { 5, 6, 7, 8 };
byte[] txnId2 = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId1);
testPeer.RemotelyCloseLastCoordinatorLinkOnDischarge(txnId: txnId1, dischargeState: false, nextTxnId: txnId2);
testPeer.ExpectCoordinatorAttach();
testPeer.ExpectDeclare(txnId2);
testPeer.ExpectDischarge(txnId2, dischargeState: true);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Transaction should have rolled back");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestRollbackErrorWhenCoordinatorRemotelyClosed()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId);
testPeer.RemotelyCloseLastCoordinatorLink();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
testPeer.WaitForAllMatchersToComplete(2000);
testPeer.ExpectCoordinatorAttach();
testPeer.ExpectDeclare(txnId);
testPeer.ExpectDischarge(txnId, true);
Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Transaction should have rolled back");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestNMSErrorCoordinatorClosedOnRollback()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId1 = { 5, 6, 7, 8 };
byte[] txnId2 = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId1);
testPeer.RemotelyCloseLastCoordinatorLinkOnDischarge(txnId: txnId1, dischargeState: true, nextTxnId: txnId2);
testPeer.ExpectCoordinatorAttach();
testPeer.ExpectDeclare(txnId2);
testPeer.ExpectDischarge(txnId2, dischargeState: true);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
Assert.Catch<NMSException>(() => session.Rollback(), "Rollback should have thrown a NMSException");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestNMSExceptionOnRollbackWhenCoordinatorRemotelyClosed()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId);
testPeer.RemotelyCloseLastCoordinatorLink();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
testPeer.WaitForAllMatchersToComplete(2000);
testPeer.ExpectCoordinatorAttach();
testPeer.ExpectDeclare(txnId);
testPeer.ExpectDischarge(txnId, dischargeState: true);
Assert.Catch<NMSException>(() => session.Rollback(), "Rollback should have thrown a NMSException");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestSendAfterCoordinatorLinkClosedDuringTX()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
byte[] txnId = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.ExpectSenderAttach();
// Close the link, the messages should now just get dropped on the floor.
testPeer.RemotelyCloseLastCoordinatorLink();
IMessageProducer producer = session.CreateProducer(queue);
testPeer.WaitForAllMatchersToComplete(2000);
producer.Send(session.CreateMessage());
// Expect that a new link will be created in order to start the next TX.
txnId = new byte[] { 1, 2, 3, 4 };
testPeer.ExpectCoordinatorAttach();
testPeer.ExpectDeclare(txnId);
// Expect that the session TX will rollback on close.
testPeer.ExpectDischarge(txnId, dischargeState: true);
Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestReceiveAfterCoordinatorLinkClosedDuringTX()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
byte[] txnId = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Create a consumer and send it an initial message for receive to process.
testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithNullContent());
// Close the link, the messages should now just get dropped on the floor.
testPeer.RemotelyCloseLastCoordinatorLink();
IMessageConsumer consumer = session.CreateConsumer(queue);
testPeer.WaitForAllMatchersToComplete(2000);
// receiving the message would normally ack it, since the TX is failed this
// should not result in a disposition going out.
IMessage received = consumer.Receive();
Assert.IsNotNull(received);
// Expect that a new link will be created in order to start the next TX.
txnId = new byte[] { 1, 2, 3, 4 };
testPeer.ExpectCoordinatorAttach();
testPeer.ExpectDeclare(txnId);
// Expect that the session TX will rollback on close.
testPeer.ExpectDischarge(txnId, dischargeState: true);
Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestSessionCreateFailsOnDeclareTimeout()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
testPeer.ExpectDeclareButDoNotRespond();
// Expect the AMQP session to be closed due to the NMS session creation failure.
testPeer.ExpectEnd();
// TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional), "Should have timed out waiting for declare.");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestSessionCreateFailsOnDeclareRejection()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Rejected disposition state to indicate failure.
testPeer.ExpectDeclareAndReject();
// Expect the AMQP session to be closed due to the NMS session creation failure.
testPeer.ExpectEnd();
Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional));
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestSessionCreateFailsOnCoordinatorLinkRefusal()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
// Expect coordinator link, refuse it, expect detach reply
string errorMessage = "CoordinatorLinkRefusal-breadcrumb";
testPeer.ExpectCoordinatorAttach(refuseLink: true, error: new Error(ErrorCode.NotImplemented) { Description = errorMessage });
testPeer.ExpectDetach(expectClosed: true, sendResponse: false, replyClosed: false);
// Expect the AMQP session to be closed due to the NMS session creation failure.
testPeer.ExpectEnd();
NMSException exception = Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional));
Assert.IsTrue(exception.Message.Contains(errorMessage), "Expected exception message to contain breadcrumb");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestTransactionRolledBackOnSessionCloseTimesOut()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId);
// Closed session should roll-back the TX with a failed discharge
testPeer.ExpectDischargeButDoNotRespond(txnId, dischargeState: true);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
// TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
Assert.Catch<NMSException>(() => session.Close(), "Should have timed out waiting for discharge.");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestTransactionRolledBackTimesOut()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId1 = { 5, 6, 7, 8 };
byte[] txnId2 = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId1);
// Expect discharge but don't respond so that the request timeout kicks in and fails
// the discharge. The pipelined declare should arrive as well and be discharged as the
// client attempts to recover to a known good state.
testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: true);
// Session should throw from the rollback and then try and recover.
testPeer.ExpectDeclare(txnId2);
testPeer.ExpectDischarge(txnId2, dischargeState: true);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
// TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
Assert.Catch<NMSException>(() => session.Rollback(), "Should have timed out waiting for discharge.");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestTransactionCommitTimesOut()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId1 = { 5, 6, 7, 8 };
byte[] txnId2 = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId1);
// Expect discharge but don't respond so that the request timeout kicks in and fails
// the discharge. The pipelined declare should arrive as well and be discharged as the
// client attempts to recover to a known good state.
testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: false);
testPeer.ExpectDeclare(txnId2);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
// TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
Assert.Catch<NMSException>(() => session.Commit(), "Should have timed out waiting for discharge.");
// Session rolls back on close
testPeer.ExpectDischarge(txnId2, dischargeState: true);
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000), Ignore("TODO: Fix")]
public void TestTransactionCommitTimesOutAndNoNextBeginTimesOut()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
byte[] txnId1 = { 5, 6, 7, 8 };
byte[] txnId2 = { 1, 2, 3, 4 };
testPeer.ExpectDeclare(txnId1);
// Expect discharge and don't respond so that the request timeout kicks in
// Expect pipelined declare and don't response so that the request timeout kicks in.
// The commit operation should throw a timed out exception at that point.
testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: false);
testPeer.ExpectDeclareButDoNotRespond();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
// After the pipelined operations both time out, the session should attempt to
// recover by creating a new TX, then on close the session should roll it back
testPeer.ExpectDeclare(txnId2);
testPeer.ExpectDischarge(txnId2, dischargeState: true);
// TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
Assert.Catch<NMSException>(() => session.Commit(), "Should have timed out waiting for discharge.");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000), Ignore("TODO: Fix")]
public void TestRollbackWithNoResponseForSuspendConsumer()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
// Then expect a *settled* TransactionalState disposition for the message once received by the consumer
testPeer.ExpectDisposition(settled: true, state =>
{
var transactionalState = (TransactionalState) state;
CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
});
// Read one so we try to suspend on rollback
IMessageConsumer messageConsumer = session.CreateConsumer(queue);
IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
Assert.NotNull(receivedMessage);
Assert.IsInstanceOf<ITextMessage>(receivedMessage);
// Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: false);
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.ExpectDischarge(txnId, dischargeState: true);
testPeer.ExpectDeclare(txnId);
testPeer.ExpectDischarge(txnId, dischargeState: true);
Assert.Catch<NMSException>(() => session.Rollback(), "Should throw a timed out exception");
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(1000);
}
}
[Test, Timeout(20_000)]
public void TestConsumerMessageOrderOnTransactedSession()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
int messageCount = 10;
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = { 5, 6, 7, 8 };
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Expect the browser enumeration to create a underlying consumer
testPeer.ExpectReceiverAttach();
// Expect initial credit to be sent, respond with some messages that are tagged with
// a sequence number we can use to determine if order is maintained.
testPeer.ExpectLinkFlowRespondWithTransfer(CreateMessageWithNullContent(), count: messageCount, addMessageNumberProperty: true);
for (int i = 1; i <= messageCount; i++)
{
// Then expect an *settled* TransactionalState disposition for each message once received by the consumer
testPeer.ExpectSettledTransactionalDisposition(txnId);
}
IMessageConsumer consumer = session.CreateConsumer(queue);
for (int i = 0; i < messageCount; i++)
{
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(500));
Assert.IsNotNull(message);
Assert.AreEqual(i, message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER));
}
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.ExpectDischarge(txnId, true);
testPeer.ExpectEnd();
session.Close();
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(3000);
}
}
[Test, Timeout(20_000)]
public void TestConsumeManyWithSingleTXPerMessage()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
connection.Start();
int messageCount = 10;
testPeer.ExpectBegin();
testPeer.ExpectCoordinatorAttach();
var txnIdQueue = new Queue<byte[]>(3);
txnIdQueue.Enqueue(new byte[] { 1, 2, 3, 4 });
txnIdQueue.Enqueue(new byte[] { 2, 4, 6, 8 });
txnIdQueue.Enqueue(new byte[] { 5, 4, 3, 2 });
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
byte[] txnId = txnIdQueue.Dequeue();
txnIdQueue.Enqueue(txnId);
testPeer.ExpectDeclare(txnId);
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IQueue queue = session.GetQueue("myQueue");
// Expect the browser enumeration to create a underlying consumer
testPeer.ExpectReceiverAttach();
// Expect initial credit to be sent, respond with some messages that are tagged with
// a sequence number we can use to determine if order is maintained.
testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithNullContent(), count: messageCount, addMessageNumberProperty: true);
IMessageConsumer consumer = session.CreateConsumer(queue);
for (int i = 0; i < messageCount; i++)
{
// Then expect an *settled* TransactionalState disposition for each message once received by the consumer
testPeer.ExpectSettledTransactionalDisposition(txnId);
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(500));
Assert.NotNull(message);
Assert.AreEqual(i, message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER));
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
testPeer.ExpectDischarge(txnId, dischargeState: false);
// Expect the next transaction to start.
txnId = txnIdQueue.Dequeue();
txnIdQueue.Enqueue(txnId);
testPeer.ExpectDeclare(txnId);
session.Commit();
}
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
testPeer.ExpectDischarge(txnId, dischargeState: true);
testPeer.ExpectEnd();
session.Close();
testPeer.ExpectClose();
connection.Close();
testPeer.WaitForAllMatchersToComplete(3000);
}
}
}
}