Merge pull request #36 from Havret/transactions_integration_tests
AMQNET-627: Transactions implementation adjustments
diff --git a/apache-nms-amqp.sln.DotSettings b/apache-nms-amqp.sln.DotSettings
index 8e25730..5c57e20 100644
--- a/apache-nms-amqp.sln.DotSettings
+++ b/apache-nms-amqp.sln.DotSettings
@@ -1,3 +1,4 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JMS/@EntryIndexedValue">JMS</s:String>
+ <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=TX/@EntryIndexedValue">TX</s:String></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Amqp/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
\ No newline at end of file
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index f291f98..fdf0f49 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -49,8 +49,8 @@
</PropertyGroup>
<ItemGroup>
- <None Include="..\..\LICENSE.txt" Pack="true" PackagePath="LICENSE.txt"/>
- <None Include="..\..\NOTICE.txt" Pack="true" PackagePath="NOTICE.txt"/>
+ <None Include="..\..\LICENSE.txt" Pack="true" PackagePath="LICENSE.txt" />
+ <None Include="..\..\NOTICE.txt" Pack="true" PackagePath="NOTICE.txt" />
</ItemGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
@@ -74,8 +74,8 @@
<ItemGroup>
<!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
- <PackageReference Include="AMQPNetLite.Core" Version="2.2.0"/>
- <PackageReference Include="Apache.NMS" Version="1.8.0"/>
- <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0"/>
+ <PackageReference Include="AMQPNetLite.Core" Version="2.3.0" />
+ <PackageReference Include="Apache.NMS" Version="1.8.0" />
+ <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
</ItemGroup>
</Project>
diff --git a/src/NMS.AMQP/Message/InboundMessageDispatch.cs b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
index e069a64..a611614 100644
--- a/src/NMS.AMQP/Message/InboundMessageDispatch.cs
+++ b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
@@ -28,5 +28,10 @@
public int RedeliveryCount => Message?.Facade.RedeliveryCount ?? 0;
public bool EnqueueFirst { get; set; }
+
+ public override string ToString()
+ {
+ return $"[{nameof(InboundMessageDispatch)}] MessageId: {Message.NMSMessageId}, {nameof(ConsumerId)}: {ConsumerId}";
+ }
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsLocalTransactionContext.cs b/src/NMS.AMQP/NmsLocalTransactionContext.cs
index 998c8de..76c2be4 100644
--- a/src/NMS.AMQP/NmsLocalTransactionContext.cs
+++ b/src/NMS.AMQP/NmsLocalTransactionContext.cs
@@ -157,7 +157,7 @@
try
{
- await this.connection.Commit(this.transactionInfo, nextTx);
+ await this.connection.Commit(this.transactionInfo, nextTx).ConfigureAwait(false);
OnTransactionCommitted();
Reset();
this.transactionInfo = nextTx;
@@ -167,6 +167,10 @@
Tracer.Info($"Commit failed for transaction :{oldTransactionId}");
throw;
}
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
finally
{
try
@@ -176,7 +180,7 @@
// one to recover our state.
if (nextTx.ProviderTxId == null)
{
- await Begin();
+ await Begin().ConfigureAwait(false);
}
}
catch (Exception e)
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 9904098..f8930f1 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -67,7 +67,25 @@
internal async Task Begin()
{
await Connection.CreateResource(SessionInfo).ConfigureAwait(false);
- await TransactionContext.Begin().ConfigureAwait(false);
+
+ try
+ {
+ // We always keep an open TX if transacted so start now.
+ await TransactionContext.Begin().ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ // failed, close the AMQP session before we throw
+ try
+ {
+ await Connection.DestroyResource(SessionInfo).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ // Ignore, throw original error
+ }
+ throw;
+ }
}
public void Close()
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index b342750..685d9a3 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -239,17 +239,7 @@
envelope.IsDelivered = true;
break;
case AckType.ACCEPTED:
- AmqpTransactionContext transactionalState = session.TransactionContext;
- if (transactionalState != null)
- {
- receiverLink.Complete(message, transactionalState.GetTxnAcceptState());
- transactionalState.RegisterTxConsumer(this);
- }
- else
- {
- receiverLink.Accept(message);
- }
- RemoveMessage(envelope);
+ HandleAccepted(envelope, message);
break;
case AckType.RELEASED:
receiverLink.Release(message);
@@ -260,8 +250,8 @@
RemoveMessage(envelope);
break;
default:
- Tracer.Error($"Unsupported Ack Type for message: {envelope.Message}");
- throw new ArgumentException($"Unsupported Ack Type for message: {envelope.Message}");
+ Tracer.ErrorFormat("Unsupported Ack Type for message: {0}", envelope);
+ throw new ArgumentException($"Unsupported Ack Type for message: {envelope}");
}
}
else
@@ -270,6 +260,32 @@
}
}
+ private void HandleAccepted(InboundMessageDispatch envelope, global::Amqp.Message message)
+ {
+ Tracer.DebugFormat("Accepted Ack of message: {0}", envelope);
+
+ if (session.IsTransacted)
+ {
+ if (!session.IsTransactionFailed)
+ {
+ var transactionalState = session.TransactionContext;
+ receiverLink.Complete(message, transactionalState.GetTxnAcceptState());
+ transactionalState.RegisterTxConsumer(this);
+ }
+ else
+ {
+ Tracer.DebugFormat("Skipping ack of message {0} in failed transaction.", envelope);
+ }
+
+ }
+ else
+ {
+ receiverLink.Accept(message);
+ }
+
+ RemoveMessage(envelope);
+ }
+
private void AddMessage(InboundMessageDispatch envelope)
{
lock (syncRoot)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpLinkExtensions.cs b/src/NMS.AMQP/Provider/Amqp/AmqpLinkExtensions.cs
new file mode 100644
index 0000000..6f850b1
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpLinkExtensions.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading.Tasks;
+using Amqp;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+ internal static class AmqpLinkExtensions
+ {
+ internal static bool IsDetaching(this Link link)
+ {
+ return link.LinkState >= LinkState.DetachPipe;
+ }
+
+ internal static Task<Outcome> SendAsync(this SenderLink link, global::Amqp.Message message, DeliveryState deliveryState, long timeoutMillis)
+ {
+ return new AmqpSendTask(link, message, deliveryState, timeoutMillis).Task;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index d3982dd..3eb8e01 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -130,7 +130,11 @@
try
{
-
+ // If the transaction has failed due to remote termination etc then we just indicate
+ // the send has succeeded until the a new transaction is started.
+ if (session.IsTransacted && session.IsTransactionFailed)
+ return;
+
var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
if (envelope.SendAsync)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs
new file mode 100644
index 0000000..f090edd
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Amqp;
+using Amqp.Framing;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+ internal class AmqpSendTask : TaskCompletionSource<Outcome>
+ {
+ private readonly Timer timer;
+
+ public AmqpSendTask(SenderLink link, global::Amqp.Message message, DeliveryState deliveryState, long timeoutMillis)
+ {
+ if (timeoutMillis != ConnectionInfo.INFINITE)
+ {
+ this.timer = new Timer(OnTimer, this, timeoutMillis, -1);
+ }
+
+ try
+ {
+ link.Send(message, deliveryState, OnOutcome, this);
+ }
+ catch (Exception e)
+ {
+ this.timer?.Dispose();
+ this.SetException(ExceptionSupport.Wrap(e));
+ }
+ }
+
+ private static void OnOutcome(ILink link, global::Amqp.Message message, Outcome outcome, object state)
+ {
+ var thisPtr = (AmqpSendTask) state;
+ thisPtr.timer?.Dispose();
+ thisPtr.TrySetResult(outcome);
+ }
+
+ private static void OnTimer(object state)
+ {
+ var thisPtr = (AmqpSendTask) state;
+ thisPtr.timer.Dispose();
+ thisPtr.TrySetException(new TimeoutException());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
index ee81348..2fb8ecb 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
@@ -53,6 +53,10 @@
public IEnumerable<AmqpConsumer> Consumers => consumers.Values.ToArray();
public Id SessionId => SessionInfo.Id;
+ internal bool IsTransacted => SessionInfo.IsTransacted;
+
+ internal bool IsTransactionFailed => TransactionContext?.IsTransactionFailed ?? false;
+
public Task Start()
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -82,7 +86,6 @@
public void Close()
{
TimeSpan timeout = TimeSpan.FromMilliseconds(SessionInfo.closeTimeout);
- TransactionContext?.Close(timeout);
UnderlyingSession.Close(timeout);
Connection.RemoveSession(SessionInfo.Id);
}
@@ -153,8 +156,8 @@
}
/// <summary>
- /// Perform re-send of all delivered but not yet acknowledged messages for all consumers
- /// active in this Session.
+ /// Perform re-send of all delivered but not yet acknowledged messages for all consumers
+ /// active in this Session.
/// </summary>
public void Recover()
{
@@ -173,8 +176,8 @@
/// Roll back the currently running Transaction
/// </summary>
/// <param name="transactionInfo">The TransactionInfo describing the transaction being rolled back.</param>
- /// <param name="nextTransactionInfo">The JmsTransactionInfo describing the transaction that should be started immediately.</param>
- /// <exception cref="IllegalStateException"></exception>
+ /// <param name="nextTransactionInfo">The TransactionInfo describing the transaction that should be started immediately.</param>
+ /// <exception cref="Exception">throws Exception if an error occurs while performing the operation.</exception>
public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
{
if (!SessionInfo.IsTransacted)
@@ -185,13 +188,19 @@
return TransactionContext.Rollback(transactionInfo, nextTransactionInfo);
}
+ /// <summary>
+ /// Commit the currently running Transaction.
+ /// </summary>
+ /// <param name="transactionInfo">the TransactionInfo describing the transaction being committed.</param>
+ /// <param name="nextTransactionInfo">the TransactionInfo describing the transaction that should be started immediately.</param>
+ /// <exception cref="Exception">throws Exception if an error occurs while performing the operation.</exception>
public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
{
if (!SessionInfo.IsTransacted)
{
throw new IllegalStateException("Non-transacted Session cannot commit a TX.");
}
-
+
return TransactionContext.Commit(transactionInfo, nextTransactionInfo);
}
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
index 6934f5f..6753e4e 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
@@ -40,6 +40,8 @@
this.session = session;
}
+ public bool IsTransactionFailed => coordinator != null && coordinator.IsDetaching();
+
public TransactionalState GetTxnEnrolledState()
{
return this.cachedTransactedState;
@@ -63,8 +65,9 @@
Tracer.Debug($"TX Context{this} rolling back current TX[{this.current}]");
- await this.coordinator.DischargeAsync(this.txnId, true).ConfigureAwait(false);
this.current = null;
+ await this.coordinator.DischargeAsync(this.txnId, true).ConfigureAwait(false);
+
PostRollback();
@@ -97,8 +100,8 @@
Tracer.Debug($"TX Context{this} committing back current TX[{this.current}]");
- await this.coordinator.DischargeAsync(this.txnId, false).ConfigureAwait(false);
this.current = null;
+ await this.coordinator.DischargeAsync(this.txnId, false).ConfigureAwait(false);
PostCommit();
@@ -115,12 +118,12 @@
if (this.current != null)
throw new NMSException("Begin called while a TX is still Active.");
- if (this.coordinator == null || this.coordinator.IsClosed)
+ if (this.coordinator == null || this.coordinator.IsDetaching())
{
this.coordinator = new AmqpTransactionCoordinator(this.session);
}
- this.txnId = await this.coordinator.DeclareAsync();
+ this.txnId = await this.coordinator.DeclareAsync().ConfigureAwait(false);
this.current = transactionInfo.Id;
transactionInfo.ProviderTxId = this.txnId;
this.cachedTransactedState = new TransactionalState { TxnId = this.txnId };
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
index a832343..d26e8fb 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
@@ -26,48 +26,67 @@
{
public class AmqpTransactionCoordinator : SenderLink
{
- public AmqpTransactionCoordinator(AmqpSession session) : base(session.UnderlyingSession, GetName(), new Attach { Target = new Coordinator(), Source = new Source() }, null)
+ private static readonly global::Amqp.Message DeclareMessage = new global::Amqp.Message(new Declare());
+
+ private readonly AmqpSession session;
+
+ public AmqpTransactionCoordinator(AmqpSession session) : base(session.UnderlyingSession, GetName(), new Attach
{
+ Target = new Coordinator
+ {
+ Capabilities = new[] { TxnCapabilities.LocalTransactions }
+ },
+ Source = new Source
+ {
+ Outcomes = new[] { SymbolUtil.ATTACH_OUTCOME_ACCEPTED, SymbolUtil.ATTACH_OUTCOME_REJECTED, SymbolUtil.ATTACH_OUTCOME_RELEASED, SymbolUtil.ATTACH_OUTCOME_MODIFIED },
+ },
+ SndSettleMode = SenderSettleMode.Unsettled,
+ RcvSettleMode = ReceiverSettleMode.First
+ }, null)
+ {
+ this.session = session;
}
private static string GetName() => "transaction-link-" + Guid.NewGuid().ToString("N").Substring(0, 5);
- public Task<byte[]> DeclareAsync()
+ public async Task<byte[]> DeclareAsync()
{
- var message = new global::Amqp.Message(new Declare());
- var tcs = new TaskCompletionSource<byte[]>();
- Send(message, null, OnDeclareOutcome, tcs);
- return tcs.Task;
- }
-
- private static void OnDeclareOutcome(ILink link, global::Amqp.Message message, Outcome outcome, object state)
- {
- var tcs = (TaskCompletionSource<byte[]>) state;
+ var outcome = await this.SendAsync(DeclareMessage, null, this.session.Connection.Info.requestTimeout).ConfigureAwait(false);
if (outcome.Descriptor.Code == MessageSupport.DECLARED_INSTANCE.Descriptor.Code)
- tcs.SetResult(((Declared) outcome).TxnId);
+ {
+ return ((Declared) outcome).TxnId;
+ }
else if (outcome.Descriptor.Code == MessageSupport.REJECTED_INSTANCE.Descriptor.Code)
- tcs.SetException(new AmqpException(((Rejected) outcome).Error));
+ {
+ var rejected = (Rejected) outcome;
+ var rejectedError = rejected.Error ?? new Error(ErrorCode.InternalError);
+ throw new AmqpException(rejectedError);
+ }
else
- tcs.SetCanceled();
+ {
+ throw new NMSException(outcome.ToString(), ErrorCode.InternalError);
+ }
}
- public Task DischargeAsync(byte[] txnId, bool fail)
+ public async Task DischargeAsync(byte[] txnId, bool fail)
{
var message = new global::Amqp.Message(new Discharge { TxnId = txnId, Fail = fail });
- var tcs = new TaskCompletionSource<bool>();
- Send(message, null, OnDischargeOutcome, tcs);
- return tcs.Task;
- }
+ var outcome = await this.SendAsync(message, null, this.session.Connection.Info.requestTimeout).ConfigureAwait(false);
- private static void OnDischargeOutcome(ILink link, global::Amqp.Message message, Outcome outcome, object state)
- {
- var tcs = (TaskCompletionSource<bool>) state;
if (outcome.Descriptor.Code == MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code)
- tcs.SetResult(true);
+ {
+ // accepted, do nothing
+ }
else if (outcome.Descriptor.Code == MessageSupport.REJECTED_INSTANCE.Descriptor.Code)
- tcs.SetException(new AmqpException(((Rejected) outcome).Error));
+ {
+ var rejected = (Rejected) outcome;
+ var rejectedError = rejected.Error ?? new Error(ErrorCode.TransactionRollback);
+ throw new TransactionRolledBackException(rejectedError.Condition, rejectedError.Description);
+ }
else
- tcs.SetCanceled();
+ {
+ throw new NMSException(outcome.ToString(), ErrorCode.InternalError);
+ }
}
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index a3b6306..ffe6a62 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -1038,11 +1038,13 @@
ITopic topic = session.GetTopic(topicName);
// Expect a link to a topic node, which we will then refuse
- testPeer.ExpectSenderAttach(targetMatcher: source =>
+ testPeer.ExpectSenderAttach(targetMatcher: x =>
{
- Assert.AreEqual(topicName, source.Address);
- Assert.IsFalse(source.Dynamic);
- Assert.AreEqual((uint) TerminusDurability.NONE, source.Durable);
+ Target target = (Target) x;
+
+ Assert.AreEqual(topicName, target.Address);
+ Assert.IsFalse(target.Dynamic);
+ Assert.AreEqual((uint) TerminusDurability.NONE, target.Durable);
}, sourceMatcher: Assert.NotNull, refuseLink: true);
//Expect the detach response to the test peer closing the producer link after refusal.
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
index b2b2d7e..ac9f351 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
@@ -206,11 +206,16 @@
testPeer.ExpectBegin();
string queueName = "myQueue";
- Action<Target> targetMatcher = null;
- if (anonymousProducer)
- targetMatcher = target => Assert.IsNull(target.Address);
- else
- targetMatcher = target => Assert.AreEqual(queueName, target.Address);
+ Action<object> targetMatcher = t =>
+ {
+ var target = t as Target;
+ Assert.IsNotNull(target);
+ if (anonymousProducer)
+ Assert.IsNull(target.Address);
+ else
+ Assert.AreEqual(queueName, target.Address);
+ };
+
testPeer.ExpectSenderAttach(targetMatcher: targetMatcher, sourceMatcher: Assert.NotNull, senderSettled: false);
diff --git a/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs
new file mode 100644
index 0000000..9242c58
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs
@@ -0,0 +1,1473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Transactions;
+using Amqp;
+using Amqp.Framing;
+using Amqp.Transactions;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+using IConnection = Apache.NMS.IConnection;
+using ISession = Apache.NMS.ISession;
+
+namespace NMS.AMQP.Test.Integration
+{
+ [TestFixture]
+ public class TransactionsIntegrationTest : IntegrationTestFixture
+ {
+ [Test, Timeout(20_000)]
+ public void TestTransactionRolledBackOnSessionClose()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId);
+
+ // Closed session should roll-back the TX with a failed discharge
+ testPeer.ExpectDischarge(txnId, true);
+ testPeer.ExpectEnd();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ session.Close();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestTransactionCommitFailWithEmptyRejectedDisposition()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+
+ byte[] txnId1 = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId1);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.ExpectSenderAttach();
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ Action<DeliveryState> stateMatcher = state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ var transactionalState = (TransactionalState) state;
+ CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
+ Assert.IsNull(transactionalState.Outcome);
+ };
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+ {
+ Outcome = new Accepted(),
+ TxnId = txnId1
+ }, responseSettled: true);
+
+ producer.Send(session.CreateMessage());
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with rejected and settled disposition to indicate the commit failed
+ testPeer.ExpectDischarge(txnId1, dischargeState: false, responseState: new Rejected());
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId2 = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId2);
+
+ Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
+
+ // session should roll back on close
+ testPeer.ExpectDischarge(txnId2, true);
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestProducedMessagesAfterCommitOfSentMessagesFails()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ byte[] txnId1 = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId1);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.ExpectSenderAttach();
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ Action<DeliveryState> stateMatcher = state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ var transactionalState = (TransactionalState) state;
+ CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
+ Assert.IsNull(transactionalState.Outcome);
+ };
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+ {
+ Outcome = new Accepted(),
+ TxnId = txnId1
+ }, responseSettled: true);
+
+ producer.Send(session.CreateMessage());
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with rejected and settled disposition to indicate the commit failed
+ testPeer.ExpectDischarge(txnId1, false, new Rejected() { Error = new Error(ErrorCode.InternalError) { Description = "Unknown error" } });
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId2 = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId2);
+
+ Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ stateMatcher = state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ var transactionalState = (TransactionalState) state;
+ CollectionAssert.AreEqual(txnId2, transactionalState.TxnId);
+ Assert.IsNull(transactionalState.Outcome);
+ };
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+ {
+ Outcome = new Accepted(),
+ TxnId = txnId2
+ }, responseSettled: true);
+ testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+ producer.Send(session.CreateMessage());
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestProducedMessagesAfterRollbackSentMessagesFails()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ byte[] txnId1 = { 5, 6, 7, 8 };
+ byte[] txnId2 = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId1);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.ExpectSenderAttach();
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+
+ Action<DeliveryState> stateMatcher = state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ var transactionalState = (TransactionalState) state;
+ CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
+ Assert.IsNull(transactionalState.Outcome);
+ };
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+ {
+ Outcome = new Accepted(),
+ TxnId = txnId1
+ }, responseSettled: true);
+
+ producer.Send(session.CreateMessage());
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with rejected and settled disposition to indicate the rollback failed
+ testPeer.ExpectDischarge(txnId1, true, new Rejected() { Error = new Error(ErrorCode.InternalError) { Description = "Unknown error" } });
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ testPeer.ExpectDeclare(txnId2);
+
+ Assert.Catch<TransactionRolledBackException>(() => session.Rollback(), "Rollback operation should have failed.");
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ stateMatcher = state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ var transactionalState = (TransactionalState) state;
+ CollectionAssert.AreEqual(txnId2, transactionalState.TxnId);
+ Assert.IsNull(transactionalState.Outcome);
+ };
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+ {
+ Outcome = new Accepted(),
+ TxnId = txnId2
+ }, responseSettled: true);
+ testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+ producer.Send(session.CreateMessage());
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCommitTransactedSessionWithConsumerReceivingAllMessages()
+ {
+ DoCommitTransactedSessionWithConsumerTestImpl(1, 1, false, false);
+ }
+
+ [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+ public void TestCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseBefore()
+ {
+ DoCommitTransactedSessionWithConsumerTestImpl(1, 1, true, true);
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseAfter()
+ {
+ DoCommitTransactedSessionWithConsumerTestImpl(1, 1, true, false);
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCommitTransactedSessionWithConsumerReceivingSomeMessages()
+ {
+ DoCommitTransactedSessionWithConsumerTestImpl(5, 2, false, false);
+ }
+
+ [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+ public void TestCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesBefore()
+ {
+ DoCommitTransactedSessionWithConsumerTestImpl(5, 2, true, true);
+ }
+
+ [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+ public void TestCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesAfter()
+ {
+ DoCommitTransactedSessionWithConsumerTestImpl(5, 2, true, false);
+ }
+
+ private void DoCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, bool closeConsumer, bool closeBeforeCommit)
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), transferCount);
+
+ for (int i = 1; i <= consumeCount; i++)
+ {
+ // Then expect a *settled* TransactionalState disposition for each message once received by the consumer
+ testPeer.ExpectDisposition(settled: true, stateMatcher: state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ var transactionalState = (TransactionalState) state;
+ Assert.AreEqual(txnId, transactionalState.TxnId);
+ Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
+ });
+ }
+
+ IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+
+ for (int i = 1; i <= consumeCount; i++)
+ {
+ IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
+ Assert.NotNull(receivedMessage);
+ Assert.IsInstanceOf<ITextMessage>(receivedMessage);
+ }
+
+ // Expect the consumer to close now
+ if (closeConsumer && closeBeforeCommit)
+ {
+ // Expect the client to then drain off all credit from the link.
+ testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
+
+ // Expect the messages that were not consumed to be released
+ int unconsumed = transferCount - consumeCount;
+ for (int i = 1; i <= unconsumed; i++)
+ {
+ testPeer.ExpectDispositionThatIsReleasedAndSettled();
+ }
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the commit succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ testPeer.ExpectDeclare(txnId);
+
+ // Now the deferred close should be performed.
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+ messageConsumer.Close();
+ }
+ else
+ {
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the commit succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ testPeer.ExpectDeclare(txnId);
+ }
+
+ session.Commit();
+
+ if (closeConsumer && !closeBeforeCommit)
+ {
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+ // Expect the messages that were not consumed to be released
+ int unconsumed = transferCount - consumeCount;
+ for (int i = 1; i <= unconsumed; i++)
+ {
+ testPeer.ExpectDispositionThatIsReleasedAndSettled();
+ }
+
+ messageConsumer.Close();
+ }
+
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+ testPeer.ExpectClose();
+
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerWithNoMessageCanCloseBeforeCommit()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+
+ // TODO: qpid-jms extend 2 additional flow links
+ // 1) Drain related with deferred consumer close, this feature is currently
+ // not implemented.
+ // 2) Consumer pull - not implemented
+ // testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
+ // testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+ IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+ Assert.IsNull(messageConsumer.ReceiveNoWait());
+
+ messageConsumer.Close();
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the commit succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ testPeer.ExpectDeclare(txnId);
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ session.Commit();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerWithNoMessageCanCloseBeforeRollback()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow();
+
+ // TODO: qpid-jms extend 2 additional flow links
+ // 1) Drain related with deferred consumer close, this feature is currently
+ // not implemented.
+ // 2) Consumer pull - not implemented
+ // testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
+ // testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+ IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+ Assert.IsNull(messageConsumer.ReceiveNoWait());
+
+ messageConsumer.Close();
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the commit succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ testPeer.ExpectDeclare(txnId);
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ session.Rollback();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestProducedMessagesOnTransactedSessionCarryTxnId()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.ExpectSenderAttach();
+
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull,
+ stateMatcher: state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ TransactionalState transactionalState = (TransactionalState) state;
+ CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
+ Assert.IsNull(transactionalState.Outcome);
+ },
+ responseState: new TransactionalState() { TxnId = txnId, Outcome = new Accepted() },
+ responseSettled: true);
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ producer.Send(session.CreateMessage());
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestProducedMessagesOnTransactedSessionCanBeReused()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.ExpectSenderAttach();
+
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+ IMessage message = session.CreateMessage();
+ for (int i = 0; i < 3; i++)
+ {
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull,
+ stateMatcher: state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ TransactionalState transactionalState = (TransactionalState) state;
+ CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
+ Assert.IsNull(transactionalState.Outcome);
+ },
+ responseState: new TransactionalState() { TxnId = txnId, Outcome = new Accepted() },
+ responseSettled: true);
+
+ message.Properties.SetInt("sequence", i);
+
+ producer.Send(message);
+ }
+
+ // Expect rollback on close without a commit call.
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+ testPeer.ExpectClose();
+
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestRollbackTransactedSessionWithConsumerReceivingAllMessages()
+ {
+ DoRollbackTransactedSessionWithConsumerTestImpl(1, 1, false);
+ }
+
+ [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+ public void TestRollbackTransactedSessionWithConsumerReceivingAllMessagesThenCloses()
+ {
+ DoRollbackTransactedSessionWithConsumerTestImpl(1, 1, true);
+ }
+
+ [Test, Timeout(20_000), Ignore("TODO: Fix")]
+ public void TestRollbackTransactedSessionWithConsumerReceivingSomeMessages()
+ {
+ DoRollbackTransactedSessionWithConsumerTestImpl(5, 2, false);
+ }
+
+ [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+ public void TestRollbackTransactedSessionWithConsumerReceivingSomeMessagesThenCloses()
+ {
+ DoRollbackTransactedSessionWithConsumerTestImpl(5, 2, true);
+ }
+
+ private void DoRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, bool closeConsumer)
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), transferCount);
+
+ for (int i = 1; i <= consumeCount; i++)
+ {
+ // Then expect a *settled* TransactionalState disposition for each message once received by the consumer
+ testPeer.ExpectDisposition(settled: true, stateMatcher: state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ var transactionalState = (TransactionalState) state;
+ Assert.AreEqual(txnId, transactionalState.TxnId);
+ Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
+ });
+ }
+
+ IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+
+ for (int i = 1; i <= consumeCount; i++)
+ {
+ IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
+ Assert.IsNotNull(receivedMessage);
+ Assert.IsInstanceOf<ITextMessage>(receivedMessage);
+ }
+
+ // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
+ testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true, creditMatcher: c => Assert.AreEqual(0, c));
+
+ if (closeConsumer)
+ {
+ // Expect the messages that were not consumed to be released
+ int unconsumed = transferCount - consumeCount;
+ for (int i = 1; i <= unconsumed; i++)
+ {
+ testPeer.ExpectDispositionThatIsReleasedAndSettled();
+ }
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the commit succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ testPeer.ExpectDeclare(txnId);
+
+ // Now the deferred close should be performed.
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+ messageConsumer.Close();
+ }
+ else
+ {
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the rollback succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ testPeer.ExpectDeclare(txnId);
+
+ // Expect the messages that were not consumed to be released
+ int unconsumed = transferCount - consumeCount;
+ for (int i = 1; i <= unconsumed; i++)
+ {
+ testPeer.ExpectDispositionThatIsReleasedAndSettled();
+ }
+
+ // Expect the consumer to be 'started' again as rollback completes
+ testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false, creditMatcher: c => Assert.Greater(c, 0));
+ }
+
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+ session.Rollback();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ // TODO:
+ // TestRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer
+ // TestRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer
+
+ [Test, Timeout(20_000)]
+ public void TestDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ string queueName = "myQueue";
+ IQueue queue = session.GetQueue(queueName);
+
+ testPeer.ExpectReceiverAttach(linkNameMatcher: Assert.IsNotNull, targetMatcher: Assert.IsNotNull, sourceMatcher: source =>
+ {
+ Assert.AreEqual(queueName, source.Address);
+ Assert.IsFalse(source.Dynamic);
+ CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_ACCEPTED);
+ CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_REJECTED);
+ CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_RELEASED);
+ CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_MODIFIED);
+
+ Assert.IsInstanceOf<Modified>(source.DefaultOutcome);
+ Modified modified = (Modified) source.DefaultOutcome;
+ Assert.IsTrue(modified.DeliveryFailed);
+ Assert.IsFalse(modified.UndeliverableHere);
+ });
+
+ testPeer.ExpectLinkFlow();
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+ session.CreateConsumer(queue);
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestCoordinatorLinkSupportedOutcomes()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach(sourceMatcher: s =>
+ {
+ Source source = (Source) s;
+ CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_ACCEPTED);
+ CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_REJECTED);
+ CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_RELEASED);
+ CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_MODIFIED);
+ });
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ connection.CreateSession(AcknowledgementMode.Transactional);
+
+ //Expect rollback on close
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestRollbackErrorCoordinatorClosedOnCommit()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId1 = { 5, 6, 7, 8 };
+ byte[] txnId2 = { 1, 2, 3, 4 };
+
+ testPeer.ExpectDeclare(txnId1);
+ testPeer.RemotelyCloseLastCoordinatorLinkOnDischarge(txnId: txnId1, dischargeState: false, nextTxnId: txnId2);
+ testPeer.ExpectCoordinatorAttach();
+ testPeer.ExpectDeclare(txnId2);
+ testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+ Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Transaction should have rolled back");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestRollbackErrorWhenCoordinatorRemotelyClosed()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId);
+ testPeer.RemotelyCloseLastCoordinatorLink();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectCoordinatorAttach();
+ testPeer.ExpectDeclare(txnId);
+
+ testPeer.ExpectDischarge(txnId, true);
+
+ Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Transaction should have rolled back");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestNMSErrorCoordinatorClosedOnRollback()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId1 = { 5, 6, 7, 8 };
+ byte[] txnId2 = { 1, 2, 3, 4 };
+
+ testPeer.ExpectDeclare(txnId1);
+ testPeer.RemotelyCloseLastCoordinatorLinkOnDischarge(txnId: txnId1, dischargeState: true, nextTxnId: txnId2);
+ testPeer.ExpectCoordinatorAttach();
+ testPeer.ExpectDeclare(txnId2);
+ testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+ Assert.Catch<NMSException>(() => session.Rollback(), "Rollback should have thrown a NMSException");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestNMSExceptionOnRollbackWhenCoordinatorRemotelyClosed()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId);
+ testPeer.RemotelyCloseLastCoordinatorLink();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ testPeer.ExpectCoordinatorAttach();
+ testPeer.ExpectDeclare(txnId);
+
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ Assert.Catch<NMSException>(() => session.Rollback(), "Rollback should have thrown a NMSException");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSendAfterCoordinatorLinkClosedDuringTX()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ byte[] txnId = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.ExpectSenderAttach();
+
+ // Close the link, the messages should now just get dropped on the floor.
+ testPeer.RemotelyCloseLastCoordinatorLink();
+
+ IMessageProducer producer = session.CreateProducer(queue);
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ producer.Send(session.CreateMessage());
+
+ // Expect that a new link will be created in order to start the next TX.
+ txnId = new byte[] { 1, 2, 3, 4 };
+ testPeer.ExpectCoordinatorAttach();
+ testPeer.ExpectDeclare(txnId);
+
+ // Expect that the session TX will rollback on close.
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestReceiveAfterCoordinatorLinkClosedDuringTX()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ byte[] txnId = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Create a consumer and send it an initial message for receive to process.
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithNullContent());
+
+ // Close the link, the messages should now just get dropped on the floor.
+ testPeer.RemotelyCloseLastCoordinatorLink();
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ testPeer.WaitForAllMatchersToComplete(2000);
+
+ // receiving the message would normally ack it, since the TX is failed this
+ // should not result in a disposition going out.
+ IMessage received = consumer.Receive();
+ Assert.IsNotNull(received);
+
+ // Expect that a new link will be created in order to start the next TX.
+ txnId = new byte[] { 1, 2, 3, 4 };
+ testPeer.ExpectCoordinatorAttach();
+ testPeer.ExpectDeclare(txnId);
+
+ // Expect that the session TX will rollback on close.
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSessionCreateFailsOnDeclareTimeout()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+ testPeer.ExpectDeclareButDoNotRespond();
+
+ // Expect the AMQP session to be closed due to the NMS session creation failure.
+ testPeer.ExpectEnd();
+
+ // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+ Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional), "Should have timed out waiting for declare.");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSessionCreateFailsOnDeclareRejection()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Rejected disposition state to indicate failure.
+ testPeer.ExpectDeclareAndReject();
+
+ // Expect the AMQP session to be closed due to the NMS session creation failure.
+ testPeer.ExpectEnd();
+
+ Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional));
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestSessionCreateFailsOnCoordinatorLinkRefusal()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ // Expect coordinator link, refuse it, expect detach reply
+ string errorMessage = "CoordinatorLinkRefusal-breadcrumb";
+ testPeer.ExpectCoordinatorAttach(refuseLink: true, error: new Error(ErrorCode.NotImplemented) { Description = errorMessage });
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: false, replyClosed: false);
+
+ // Expect the AMQP session to be closed due to the NMS session creation failure.
+ testPeer.ExpectEnd();
+
+ NMSException exception = Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional));
+ Assert.IsTrue(exception.Message.Contains(errorMessage), "Expected exception message to contain breadcrumb");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestTransactionRolledBackOnSessionCloseTimesOut()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId);
+
+ // Closed session should roll-back the TX with a failed discharge
+ testPeer.ExpectDischargeButDoNotRespond(txnId, dischargeState: true);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+ // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+ Assert.Catch<NMSException>(() => session.Close(), "Should have timed out waiting for discharge.");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestTransactionRolledBackTimesOut()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId1 = { 5, 6, 7, 8 };
+ byte[] txnId2 = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId1);
+
+ // Expect discharge but don't respond so that the request timeout kicks in and fails
+ // the discharge. The pipelined declare should arrive as well and be discharged as the
+ // client attempts to recover to a known good state.
+ testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: true);
+
+ // Session should throw from the rollback and then try and recover.
+ testPeer.ExpectDeclare(txnId2);
+ testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+ // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+ Assert.Catch<NMSException>(() => session.Rollback(), "Should have timed out waiting for discharge.");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestTransactionCommitTimesOut()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId1 = { 5, 6, 7, 8 };
+ byte[] txnId2 = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId1);
+
+ // Expect discharge but don't respond so that the request timeout kicks in and fails
+ // the discharge. The pipelined declare should arrive as well and be discharged as the
+ // client attempts to recover to a known good state.
+ testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: false);
+ testPeer.ExpectDeclare(txnId2);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+ // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+ Assert.Catch<NMSException>(() => session.Commit(), "Should have timed out waiting for discharge.");
+
+ // Session rolls back on close
+ testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000), Ignore("TODO: Fix")]
+ public void TestTransactionCommitTimesOutAndNoNextBeginTimesOut()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ byte[] txnId1 = { 5, 6, 7, 8 };
+ byte[] txnId2 = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId1);
+
+ // Expect discharge and don't respond so that the request timeout kicks in
+ // Expect pipelined declare and don't response so that the request timeout kicks in.
+ // The commit operation should throw a timed out exception at that point.
+ testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: false);
+ testPeer.ExpectDeclareButDoNotRespond();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+ // After the pipelined operations both time out, the session should attempt to
+ // recover by creating a new TX, then on close the session should roll it back
+ testPeer.ExpectDeclare(txnId2);
+ testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+ // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+ Assert.Catch<NMSException>(() => session.Commit(), "Should have timed out waiting for discharge.");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000), Ignore("TODO: Fix")]
+ public void TestRollbackWithNoResponseForSuspendConsumer()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+ // Then expect a *settled* TransactionalState disposition for the message once received by the consumer
+ testPeer.ExpectDisposition(settled: true, state =>
+ {
+ var transactionalState = (TransactionalState) state;
+ CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
+ Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
+ });
+
+ // Read one so we try to suspend on rollback
+ IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+ IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
+
+ Assert.NotNull(receivedMessage);
+ Assert.IsInstanceOf<ITextMessage>(receivedMessage);
+
+ // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
+ testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: false);
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the rollback succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ testPeer.ExpectDeclare(txnId);
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ Assert.Catch<NMSException>(() => session.Rollback(), "Should throw a timed out exception");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerMessageOrderOnTransactedSession()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ int messageCount = 10;
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = { 5, 6, 7, 8 };
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Expect the browser enumeration to create a underlying consumer
+ testPeer.ExpectReceiverAttach();
+
+ // Expect initial credit to be sent, respond with some messages that are tagged with
+ // a sequence number we can use to determine if order is maintained.
+ testPeer.ExpectLinkFlowRespondWithTransfer(CreateMessageWithNullContent(), count: messageCount, addMessageNumberProperty: true);
+
+ for (int i = 1; i <= messageCount; i++)
+ {
+ // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
+ testPeer.ExpectSettledTransactionalDisposition(txnId);
+ }
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ for (int i = 0; i < messageCount; i++)
+ {
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(500));
+ Assert.IsNotNull(message);
+ Assert.AreEqual(i, message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER));
+ }
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the rollback succeeded
+ testPeer.ExpectDischarge(txnId, true);
+ testPeer.ExpectEnd();
+
+ session.Close();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumeManyWithSingleTXPerMessage()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ int messageCount = 10;
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+
+ var txnIdQueue = new Queue<byte[]>(3);
+ txnIdQueue.Enqueue(new byte[] { 1, 2, 3, 4 });
+ txnIdQueue.Enqueue(new byte[] { 2, 4, 6, 8 });
+ txnIdQueue.Enqueue(new byte[] { 5, 4, 3, 2 });
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ byte[] txnId = txnIdQueue.Dequeue();
+ txnIdQueue.Enqueue(txnId);
+ testPeer.ExpectDeclare(txnId);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue("myQueue");
+
+ // Expect the browser enumeration to create a underlying consumer
+ testPeer.ExpectReceiverAttach();
+
+ // Expect initial credit to be sent, respond with some messages that are tagged with
+ // a sequence number we can use to determine if order is maintained.
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithNullContent(), count: messageCount, addMessageNumberProperty: true);
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
+ testPeer.ExpectSettledTransactionalDisposition(txnId);
+
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(500));
+ Assert.NotNull(message);
+ Assert.AreEqual(i, message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER));
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the commit succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+ // Expect the next transaction to start.
+ txnId = txnIdQueue.Dequeue();
+ txnIdQueue.Enqueue(txnId);
+ testPeer.ExpectDeclare(txnId);
+
+ session.Commit();
+ }
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the rollback succeeded
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+ testPeer.ExpectEnd();
+
+ session.Close();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 71f7f02..6c2f147 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -26,6 +26,7 @@
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
+using Amqp.Transactions;
using Amqp.Types;
using Apache.NMS.AMQP.Util;
using NLog;
@@ -55,6 +56,7 @@
private ushort lastInitiatedChannel = 0;
private uint lastInitiatedLinkHandle;
+ private uint lastInitiatedCoordinatorLinkHandle = 0;
private readonly LinkedList<IMatcher> matchers = new LinkedList<IMatcher>();
private readonly object matchersLock = new object();
@@ -273,7 +275,7 @@
);
}
- public void ExpectLinkFlowRespondWithTransfer(Amqp.Message message, int count = 1)
+ public void ExpectLinkFlowRespondWithTransfer(Amqp.Message message, int count = 1, bool addMessageNumberProperty = false)
{
Action<uint> creditMatcher = credit => Assert.Greater(credit, 0);
@@ -283,7 +285,7 @@
drain: false,
sendDrainFlowResponse: false,
sendSettled: false,
- addMessageNumberProperty: false,
+ addMessageNumberProperty: addMessageNumberProperty,
creditMatcher: creditMatcher,
nextIncomingId: 1
);
@@ -306,7 +308,7 @@
FrameMatcher<Flow> flowMatcher = new FrameMatcher<Flow>()
.WithAssertion(flow => Assert.AreEqual(drain, flow.Drain))
- .WithAssertion(flow => creditMatcher(flow.LinkCredit));
+ .WithAssertion(flow => creditMatcher?.Invoke(flow.LinkCredit));
if (nextIncomingId != null)
{
@@ -419,19 +421,20 @@
ExpectSenderAttach(sourceMatcher: Assert.NotNull, targetMatcher: Assert.NotNull);
}
- public void ExpectSenderAttach(Action<Source> sourceMatcher,
- Action<Target> targetMatcher,
+ public void ExpectSenderAttach(Action<object> sourceMatcher,
+ Action<object> targetMatcher,
bool refuseLink = false,
uint creditAmount = 100,
- bool senderSettled = false)
+ bool senderSettled = false,
+ Error error = null)
{
var attachMatcher = new FrameMatcher<Attach>()
.WithAssertion(attach => Assert.IsNotNull(attach.LinkName))
.WithAssertion(attach => Assert.AreEqual(attach.Role, Role.SENDER))
.WithAssertion(attach => Assert.AreEqual(senderSettled ? SenderSettleMode.Settled : SenderSettleMode.Unsettled, attach.SndSettleMode))
.WithAssertion(attach => Assert.AreEqual(attach.RcvSettleMode, ReceiverSettleMode.First))
- .WithAssertion(attach => sourceMatcher(attach.Source as Source))
- .WithAssertion(attach => targetMatcher(attach.Target as Target))
+ .WithAssertion(attach => sourceMatcher(attach.Source))
+ .WithAssertion(attach => targetMatcher(attach.Target))
.WithOnComplete(context =>
{
var attach = new Attach()
@@ -452,11 +455,21 @@
lastInitiatedLinkHandle = context.Command.Handle;
+ if (context.Command.Target is Coordinator)
+ {
+ lastInitiatedCoordinatorLinkHandle = context.Command.Handle;
+ }
+
context.SendCommand(attach);
if (refuseLink)
{
var detach = new Detach { Closed = true, Handle = context.Command.Handle };
+ if (error != null)
+ {
+ detach.Error = error;
+ }
+
context.SendCommand(detach);
}
else
@@ -684,6 +697,19 @@
AddMatcher(dispositionMatcher);
}
+ public void ExpectSettledTransactionalDisposition(byte[] txnId)
+ {
+ void StateMatcher(DeliveryState state)
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ var transactionalState = (TransactionalState) state;
+ Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
+ CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
+ }
+
+ ExpectDisposition(settled: true, StateMatcher);
+ }
+
public void ExpectTransferButDoNotRespond(Action<Amqp.Message> messageMatcher)
{
ExpectTransfer(messageMatcher: messageMatcher,
@@ -705,6 +731,17 @@
);
}
+ public void ExpectTransfer(Action<Amqp.Message> messageMatcher, Action<DeliveryState> stateMatcher, DeliveryState responseState, bool responseSettled)
+ {
+ ExpectTransfer(messageMatcher: messageMatcher,
+ stateMatcher: stateMatcher,
+ settled: false,
+ sendResponseDisposition: true,
+ responseState: responseState,
+ responseSettled: responseSettled
+ );
+ }
+
public void ExpectTransfer(Action<Amqp.Message> messageMatcher,
Action<DeliveryState> stateMatcher,
bool settled,
@@ -862,7 +899,124 @@
responseSourceOverride: responseSourceOverride,
desiredCapabilitiesMatcher: linkDesiredCapabilitiesMatcher);
}
-
+
+ public void ExpectCoordinatorAttach(Action<object> sourceMatcher = null, bool refuseLink = false, Error error = null)
+ {
+ Action<object> coordinatorMatcher = Assert.IsInstanceOf<Coordinator>;
+ sourceMatcher = sourceMatcher ?? Assert.IsNotNull;
+
+ ExpectSenderAttach(sourceMatcher: sourceMatcher, targetMatcher: coordinatorMatcher, refuseLink: refuseLink, error: error);
+ }
+
+ public void ExpectDeclare(byte[] txnId)
+ {
+ ExpectTransfer(messageMatcher: DeclareMatcher, stateMatcher: Assert.IsNull, settled: false, sendResponseDisposition: true, responseState: new Declared() { TxnId = txnId },
+ responseSettled: true);
+ }
+
+ public void ExpectDeclareButDoNotRespond()
+ {
+ ExpectTransfer(messageMatcher: DeclareMatcher, stateMatcher: Assert.IsNull, settled: false, sendResponseDisposition: false, responseState: null, responseSettled: false);
+ }
+
+
+ public void ExpectDeclareAndReject()
+ {
+ ExpectTransfer(messageMatcher: DeclareMatcher, stateMatcher: Assert.IsNull, responseState: new Rejected(), responseSettled: true);
+ }
+
+ void DeclareMatcher(Amqp.Message message)
+ {
+ Assert.IsNotNull(message);
+ Assert.IsInstanceOf<AmqpValue>(message.BodySection);
+ Assert.IsInstanceOf<Declare>(((AmqpValue) message.BodySection).Value);
+ }
+
+ public void ExpectDischarge(byte[] txnId, bool dischargeState)
+ {
+ ExpectDischarge(txnId: txnId, dischargeState: dischargeState, new Accepted());
+ }
+
+ public void ExpectDischargeButDoNotRespond(byte[] txnId, bool dischargeState)
+ {
+ ExpectTransfer(messageMatcher: m => DischargeMatcher(m, txnId, dischargeState),
+ stateMatcher: Assert.IsNull,
+ settled: false,
+ sendResponseDisposition: false,
+ responseState: null,
+ responseSettled: true);
+ }
+
+ public void ExpectDischarge(byte[] txnId, bool dischargeState, DeliveryState responseState)
+ {
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with given response and settled disposition to indicate the outcome.
+ ExpectTransfer(messageMatcher: m => DischargeMatcher(m, txnId, dischargeState),
+ stateMatcher: Assert.IsNull,
+ settled: false,
+ sendResponseDisposition: true,
+ responseState: responseState,
+ responseSettled: true);
+ }
+
+ private void DischargeMatcher(Amqp.Message message, byte[] txnId, bool dischargeState)
+ {
+ Assert.IsNotNull(message);
+ var bodySection = message.BodySection as AmqpValue;
+ Assert.IsNotNull(bodySection);
+ var discharge = bodySection.Value as Discharge;
+ Assert.AreEqual(dischargeState, discharge.Fail);
+ CollectionAssert.AreEqual(txnId, discharge.TxnId);
+ }
+
+ public void RemotelyCloseLastCoordinatorLinkOnDischarge(byte[] txnId, bool dischargeState, byte[] nextTxnId)
+ {
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with given response and settled disposition to indicate the outcome.
+ void DischargeMatcher(Amqp.Message message)
+ {
+ Assert.IsNotNull(message);
+ var bodySection = message.BodySection as AmqpValue;
+ Assert.IsNotNull(bodySection);
+ var discharge = bodySection.Value as Discharge;
+ Assert.AreEqual(dischargeState, discharge.Fail);
+ CollectionAssert.AreEqual(txnId, discharge.TxnId);
+ }
+
+ ExpectTransfer(messageMatcher: DischargeMatcher, stateMatcher: Assert.IsNull, settled: false, sendResponseDisposition: false, responseState: null, responseSettled: false);
+
+ RemotelyCloseLastCoordinatorLink(expectDetachResponse: true, closed: true, error: new Error(ErrorCode.TransactionRollback) { Description = "Discharge of TX failed." });
+ }
+
+ public void RemotelyCloseLastCoordinatorLink()
+ {
+ RemotelyCloseLastCoordinatorLink(expectDetachResponse: true, closed: true, error: new Error(ErrorCode.TransactionRollback) { Description = "Discharge of TX failed." });
+ }
+
+ private void RemotelyCloseLastCoordinatorLink(bool expectDetachResponse, bool closed, Error error)
+ {
+ lock (matchersLock)
+ {
+ var matcher = GetLastMatcher();
+ matcher.WithOnComplete(context =>
+ {
+ var detach = new Detach { Closed = true, Handle = lastInitiatedCoordinatorLinkHandle };
+ if (error != null)
+ {
+ detach.Error = error;
+ }
+
+ context.SendCommand(detach);
+ });
+
+ if (expectDetachResponse)
+ {
+ var detachMatcher = new FrameMatcher<Detach>().WithAssertion(detach => Assert.AreEqual(closed, detach.Closed));
+ AddMatcher(detachMatcher);
+ }
+ }
+ }
+
public void PurgeExpectations()
{
lock (matchersLock)