Merge pull request #36 from Havret/transactions_integration_tests

AMQNET-627: Transactions implementation adjustments
diff --git a/apache-nms-amqp.sln.DotSettings b/apache-nms-amqp.sln.DotSettings
index 8e25730..5c57e20 100644
--- a/apache-nms-amqp.sln.DotSettings
+++ b/apache-nms-amqp.sln.DotSettings
@@ -1,3 +1,4 @@
 <wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
 	<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JMS/@EntryIndexedValue">JMS</s:String>
+	<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=TX/@EntryIndexedValue">TX</s:String></wpf:ResourceDictionary>	
 	<s:Boolean x:Key="/Default/UserDictionary/Words/=Amqp/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
\ No newline at end of file
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index f291f98..fdf0f49 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -49,8 +49,8 @@
     </PropertyGroup>
 
     <ItemGroup>
-        <None Include="..\..\LICENSE.txt" Pack="true" PackagePath="LICENSE.txt"/>
-        <None Include="..\..\NOTICE.txt" Pack="true" PackagePath="NOTICE.txt"/>
+        <None Include="..\..\LICENSE.txt" Pack="true" PackagePath="LICENSE.txt" />
+        <None Include="..\..\NOTICE.txt" Pack="true" PackagePath="NOTICE.txt" />
     </ItemGroup>
 
     <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
@@ -74,8 +74,8 @@
 
     <ItemGroup>
         <!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
-        <PackageReference Include="AMQPNetLite.Core" Version="2.2.0"/>
-        <PackageReference Include="Apache.NMS" Version="1.8.0"/>
-        <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0"/>
+        <PackageReference Include="AMQPNetLite.Core" Version="2.3.0" />
+        <PackageReference Include="Apache.NMS" Version="1.8.0" />
+        <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
     </ItemGroup>
 </Project>
diff --git a/src/NMS.AMQP/Message/InboundMessageDispatch.cs b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
index e069a64..a611614 100644
--- a/src/NMS.AMQP/Message/InboundMessageDispatch.cs
+++ b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
@@ -28,5 +28,10 @@
 
         public int RedeliveryCount => Message?.Facade.RedeliveryCount ?? 0;
         public bool EnqueueFirst { get; set; }
+
+        public override string ToString()
+        {
+            return $"[{nameof(InboundMessageDispatch)}] MessageId: {Message.NMSMessageId}, {nameof(ConsumerId)}: {ConsumerId}";
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsLocalTransactionContext.cs b/src/NMS.AMQP/NmsLocalTransactionContext.cs
index 998c8de..76c2be4 100644
--- a/src/NMS.AMQP/NmsLocalTransactionContext.cs
+++ b/src/NMS.AMQP/NmsLocalTransactionContext.cs
@@ -157,7 +157,7 @@
 
             try
             {
-                await this.connection.Commit(this.transactionInfo, nextTx);
+                await this.connection.Commit(this.transactionInfo, nextTx).ConfigureAwait(false);
                 OnTransactionCommitted();
                 Reset();
                 this.transactionInfo = nextTx;
@@ -167,6 +167,10 @@
                 Tracer.Info($"Commit failed for transaction :{oldTransactionId}");
                 throw;
             }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
             finally
             {
                 try
@@ -176,7 +180,7 @@
                     // one to recover our state.
                     if (nextTx.ProviderTxId == null)
                     {
-                        await Begin();
+                        await Begin().ConfigureAwait(false);
                     }
                 }
                 catch (Exception e)
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 9904098..f8930f1 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -67,7 +67,25 @@
         internal async Task Begin()
         {
             await Connection.CreateResource(SessionInfo).ConfigureAwait(false);
-            await TransactionContext.Begin().ConfigureAwait(false);
+
+            try
+            {
+                // We always keep an open TX if transacted so start now.
+                await TransactionContext.Begin().ConfigureAwait(false);
+            }
+            catch (Exception)
+            {
+                // failed, close the AMQP session before we throw
+                try
+                {
+                    await Connection.DestroyResource(SessionInfo).ConfigureAwait(false);
+                }
+                catch (Exception)
+                {
+                    // Ignore, throw original error
+                }
+                throw;
+            }
         }
 
         public void Close()
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index b342750..685d9a3 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -239,17 +239,7 @@
                         envelope.IsDelivered = true;
                         break;
                     case AckType.ACCEPTED:
-                        AmqpTransactionContext transactionalState = session.TransactionContext;
-                        if (transactionalState != null)
-                        {
-                            receiverLink.Complete(message, transactionalState.GetTxnAcceptState());
-                            transactionalState.RegisterTxConsumer(this);
-                        }
-                        else
-                        {
-                            receiverLink.Accept(message);
-                        }
-                        RemoveMessage(envelope);
+                        HandleAccepted(envelope, message);
                         break;
                     case AckType.RELEASED:
                         receiverLink.Release(message);
@@ -260,8 +250,8 @@
                         RemoveMessage(envelope);
                         break;
                     default:
-                        Tracer.Error($"Unsupported Ack Type for message: {envelope.Message}");
-                        throw new ArgumentException($"Unsupported Ack Type for message: {envelope.Message}");
+                        Tracer.ErrorFormat("Unsupported Ack Type for message: {0}", envelope);
+                        throw new ArgumentException($"Unsupported Ack Type for message: {envelope}");
                 }
             }
             else
@@ -270,6 +260,32 @@
             }
         }
 
+        private void HandleAccepted(InboundMessageDispatch envelope, global::Amqp.Message message)
+        {
+            Tracer.DebugFormat("Accepted Ack of message: {0}", envelope);
+            
+            if (session.IsTransacted)
+            {
+                if (!session.IsTransactionFailed)
+                {
+                    var transactionalState = session.TransactionContext;
+                    receiverLink.Complete(message, transactionalState.GetTxnAcceptState());
+                    transactionalState.RegisterTxConsumer(this);
+                }
+                else
+                {
+                    Tracer.DebugFormat("Skipping ack of message {0} in failed transaction.", envelope);
+                }
+                
+            }
+            else
+            {
+                receiverLink.Accept(message);
+            }
+
+            RemoveMessage(envelope);
+        }
+
         private void AddMessage(InboundMessageDispatch envelope)
         {
             lock (syncRoot)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpLinkExtensions.cs b/src/NMS.AMQP/Provider/Amqp/AmqpLinkExtensions.cs
new file mode 100644
index 0000000..6f850b1
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpLinkExtensions.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading.Tasks;
+using Amqp;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+    internal static class AmqpLinkExtensions
+    {
+        internal static bool IsDetaching(this Link link)
+        {
+            return link.LinkState >= LinkState.DetachPipe;
+        }
+
+        internal static Task<Outcome> SendAsync(this SenderLink link, global::Amqp.Message message, DeliveryState deliveryState, long timeoutMillis)
+        {
+            return new AmqpSendTask(link, message, deliveryState, timeoutMillis).Task;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index d3982dd..3eb8e01 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -130,7 +130,11 @@
 
                 try
                 {
-                    
+                    // If the transaction has failed due to remote termination etc then we just indicate
+                    // the send has succeeded until the a new transaction is started.
+                    if (session.IsTransacted && session.IsTransactionFailed)
+                        return;
+
                     var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
 
                     if (envelope.SendAsync)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs
new file mode 100644
index 0000000..f090edd
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Amqp;
+using Amqp.Framing;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+    internal class AmqpSendTask : TaskCompletionSource<Outcome>
+    {
+        private readonly Timer timer;
+        
+        public AmqpSendTask(SenderLink link, global::Amqp.Message message, DeliveryState deliveryState, long timeoutMillis)
+        {
+            if (timeoutMillis != ConnectionInfo.INFINITE)
+            {
+                this.timer = new Timer(OnTimer, this, timeoutMillis, -1);
+            }
+            
+            try
+            {
+                link.Send(message, deliveryState, OnOutcome, this);
+            }
+            catch (Exception e)
+            {
+                this.timer?.Dispose();
+                this.SetException(ExceptionSupport.Wrap(e));
+            }
+        }
+        
+        private static void OnOutcome(ILink link, global::Amqp.Message message, Outcome outcome, object state)
+        {
+            var thisPtr = (AmqpSendTask) state;
+            thisPtr.timer?.Dispose();
+            thisPtr.TrySetResult(outcome);
+        }
+        
+        private static void OnTimer(object state)
+        {
+            var thisPtr = (AmqpSendTask) state;
+            thisPtr.timer.Dispose();
+            thisPtr.TrySetException(new TimeoutException());
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
index ee81348..2fb8ecb 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
@@ -53,6 +53,10 @@
         public IEnumerable<AmqpConsumer> Consumers => consumers.Values.ToArray();
         public Id SessionId => SessionInfo.Id;
 
+        internal bool IsTransacted => SessionInfo.IsTransacted;
+
+        internal bool IsTransactionFailed => TransactionContext?.IsTransactionFailed ?? false;
+
         public Task Start()
         {
             TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -82,7 +86,6 @@
         public void Close()
         {
             TimeSpan timeout = TimeSpan.FromMilliseconds(SessionInfo.closeTimeout);
-            TransactionContext?.Close(timeout);
             UnderlyingSession.Close(timeout);
             Connection.RemoveSession(SessionInfo.Id);
         }
@@ -153,8 +156,8 @@
         }
 
         /// <summary>
-        ///     Perform re-send of all delivered but not yet acknowledged messages for all consumers
-        ///     active in this Session.
+        /// Perform re-send of all delivered but not yet acknowledged messages for all consumers
+        /// active in this Session.
         /// </summary>
         public void Recover()
         {
@@ -173,8 +176,8 @@
         /// Roll back the currently running Transaction
         /// </summary>
         /// <param name="transactionInfo">The TransactionInfo describing the transaction being rolled back.</param>
-        /// <param name="nextTransactionInfo">The JmsTransactionInfo describing the transaction that should be started immediately.</param>
-        /// <exception cref="IllegalStateException"></exception>
+        /// <param name="nextTransactionInfo">The TransactionInfo describing the transaction that should be started immediately.</param>
+        /// <exception cref="Exception">throws Exception if an error occurs while performing the operation.</exception>
         public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
         {
             if (!SessionInfo.IsTransacted)
@@ -185,13 +188,19 @@
             return TransactionContext.Rollback(transactionInfo, nextTransactionInfo);
         }
 
+        /// <summary>
+        /// Commit the currently running Transaction.
+        /// </summary>
+        /// <param name="transactionInfo">the TransactionInfo describing the transaction being committed.</param>
+        /// <param name="nextTransactionInfo">the TransactionInfo describing the transaction that should be started immediately.</param>
+        /// <exception cref="Exception">throws Exception if an error occurs while performing the operation.</exception>
         public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
         {
             if (!SessionInfo.IsTransacted)
             {
                 throw new IllegalStateException("Non-transacted Session cannot commit a TX.");
             }
-            
+
             return TransactionContext.Commit(transactionInfo, nextTransactionInfo);
         }
     }
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
index 6934f5f..6753e4e 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
@@ -40,6 +40,8 @@
             this.session = session;
         }
 
+        public bool IsTransactionFailed => coordinator != null && coordinator.IsDetaching();
+        
         public TransactionalState GetTxnEnrolledState()
         {
             return this.cachedTransactedState;
@@ -63,8 +65,9 @@
 
             Tracer.Debug($"TX Context{this} rolling back current TX[{this.current}]");
 
-            await this.coordinator.DischargeAsync(this.txnId, true).ConfigureAwait(false);
             this.current = null;
+            await this.coordinator.DischargeAsync(this.txnId, true).ConfigureAwait(false);
+            
 
             PostRollback();
 
@@ -97,8 +100,8 @@
 
             Tracer.Debug($"TX Context{this} committing back current TX[{this.current}]");
 
-            await this.coordinator.DischargeAsync(this.txnId, false).ConfigureAwait(false);
             this.current = null;
+            await this.coordinator.DischargeAsync(this.txnId, false).ConfigureAwait(false);
 
             PostCommit();
 
@@ -115,12 +118,12 @@
             if (this.current != null)
                 throw new NMSException("Begin called while a TX is still Active.");
 
-            if (this.coordinator == null || this.coordinator.IsClosed)
+            if (this.coordinator == null || this.coordinator.IsDetaching())
             {
                 this.coordinator = new AmqpTransactionCoordinator(this.session);
             }
 
-            this.txnId = await this.coordinator.DeclareAsync();
+            this.txnId = await this.coordinator.DeclareAsync().ConfigureAwait(false);
             this.current = transactionInfo.Id;
             transactionInfo.ProviderTxId = this.txnId;
             this.cachedTransactedState = new TransactionalState { TxnId = this.txnId };
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
index a832343..d26e8fb 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
@@ -26,48 +26,67 @@
 {
     public class AmqpTransactionCoordinator : SenderLink
     {
-        public AmqpTransactionCoordinator(AmqpSession session) : base(session.UnderlyingSession, GetName(), new Attach { Target = new Coordinator(), Source = new Source() }, null)
+        private static readonly global::Amqp.Message DeclareMessage = new global::Amqp.Message(new Declare());
+
+        private readonly AmqpSession session;
+
+        public AmqpTransactionCoordinator(AmqpSession session) : base(session.UnderlyingSession, GetName(), new Attach
         {
+            Target = new Coordinator
+            {
+                Capabilities = new[] { TxnCapabilities.LocalTransactions }
+            },
+            Source = new Source
+            {
+                Outcomes = new[] { SymbolUtil.ATTACH_OUTCOME_ACCEPTED, SymbolUtil.ATTACH_OUTCOME_REJECTED, SymbolUtil.ATTACH_OUTCOME_RELEASED, SymbolUtil.ATTACH_OUTCOME_MODIFIED },
+            },
+            SndSettleMode = SenderSettleMode.Unsettled,
+            RcvSettleMode = ReceiverSettleMode.First
+        }, null)
+        {
+            this.session = session;
         }
 
         private static string GetName() => "transaction-link-" + Guid.NewGuid().ToString("N").Substring(0, 5);
 
-        public Task<byte[]> DeclareAsync()
+        public async Task<byte[]> DeclareAsync()
         {
-            var message = new global::Amqp.Message(new Declare());
-            var tcs = new TaskCompletionSource<byte[]>();
-            Send(message, null, OnDeclareOutcome, tcs);
-            return tcs.Task;
-        }
-
-        private static void OnDeclareOutcome(ILink link, global::Amqp.Message message, Outcome outcome, object state)
-        {
-            var tcs = (TaskCompletionSource<byte[]>) state;
+            var outcome = await this.SendAsync(DeclareMessage, null, this.session.Connection.Info.requestTimeout).ConfigureAwait(false);
             if (outcome.Descriptor.Code == MessageSupport.DECLARED_INSTANCE.Descriptor.Code)
-                tcs.SetResult(((Declared) outcome).TxnId);
+            {
+                return ((Declared) outcome).TxnId;
+            }
             else if (outcome.Descriptor.Code == MessageSupport.REJECTED_INSTANCE.Descriptor.Code)
-                tcs.SetException(new AmqpException(((Rejected) outcome).Error));
+            {
+                var rejected = (Rejected) outcome;
+                var rejectedError = rejected.Error ?? new Error(ErrorCode.InternalError);
+                throw new AmqpException(rejectedError);
+            }
             else
-                tcs.SetCanceled();
+            {
+                throw new NMSException(outcome.ToString(), ErrorCode.InternalError);
+            }
         }
 
-        public Task DischargeAsync(byte[] txnId, bool fail)
+        public async Task DischargeAsync(byte[] txnId, bool fail)
         {
             var message = new global::Amqp.Message(new Discharge { TxnId = txnId, Fail = fail });
-            var tcs = new TaskCompletionSource<bool>();
-            Send(message, null, OnDischargeOutcome, tcs);
-            return tcs.Task;
-        }
+            var outcome = await this.SendAsync(message, null, this.session.Connection.Info.requestTimeout).ConfigureAwait(false);
 
-        private static void OnDischargeOutcome(ILink link, global::Amqp.Message message, Outcome outcome, object state)
-        {
-            var tcs = (TaskCompletionSource<bool>) state;
             if (outcome.Descriptor.Code == MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code)
-                tcs.SetResult(true);
+            {
+                // accepted, do nothing
+            }
             else if (outcome.Descriptor.Code == MessageSupport.REJECTED_INSTANCE.Descriptor.Code)
-                tcs.SetException(new AmqpException(((Rejected) outcome).Error));
+            {
+                var rejected = (Rejected) outcome;
+                var rejectedError = rejected.Error ?? new Error(ErrorCode.TransactionRollback);
+                throw new TransactionRolledBackException(rejectedError.Condition, rejectedError.Description);
+            }
             else
-                tcs.SetCanceled();
+            {
+                throw new NMSException(outcome.ToString(), ErrorCode.InternalError);
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index a3b6306..ffe6a62 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -1038,11 +1038,13 @@
                 ITopic topic = session.GetTopic(topicName);
 
                 // Expect a link to a topic node, which we will then refuse
-                testPeer.ExpectSenderAttach(targetMatcher: source =>
+                testPeer.ExpectSenderAttach(targetMatcher: x =>
                 {
-                    Assert.AreEqual(topicName, source.Address);
-                    Assert.IsFalse(source.Dynamic);
-                    Assert.AreEqual((uint) TerminusDurability.NONE, source.Durable);
+                    Target target = (Target) x;
+
+                    Assert.AreEqual(topicName, target.Address);
+                    Assert.IsFalse(target.Dynamic);
+                    Assert.AreEqual((uint) TerminusDurability.NONE, target.Durable);
                 }, sourceMatcher: Assert.NotNull, refuseLink: true);
 
                 //Expect the detach response to the test peer closing the producer link after refusal.
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
index b2b2d7e..ac9f351 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationTest.cs
@@ -206,11 +206,16 @@
                 testPeer.ExpectBegin();
 
                 string queueName = "myQueue";
-                Action<Target> targetMatcher = null;
-                if (anonymousProducer)
-                    targetMatcher = target => Assert.IsNull(target.Address);
-                else
-                    targetMatcher = target => Assert.AreEqual(queueName, target.Address);
+                Action<object> targetMatcher = t =>
+                {
+                    var target = t as Target;
+                    Assert.IsNotNull(target);
+                    if (anonymousProducer)
+                        Assert.IsNull(target.Address);
+                    else
+                        Assert.AreEqual(queueName, target.Address);
+                };
+                
 
                 testPeer.ExpectSenderAttach(targetMatcher: targetMatcher, sourceMatcher: Assert.NotNull, senderSettled: false);
 
diff --git a/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs
new file mode 100644
index 0000000..9242c58
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs
@@ -0,0 +1,1473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Transactions;
+using Amqp;
+using Amqp.Framing;
+using Amqp.Transactions;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+using IConnection = Apache.NMS.IConnection;
+using ISession = Apache.NMS.ISession;
+
+namespace NMS.AMQP.Test.Integration
+{
+    [TestFixture]
+    public class TransactionsIntegrationTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public void TestTransactionRolledBackOnSessionClose()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId);
+
+                // Closed session should roll-back the TX with a failed discharge
+                testPeer.ExpectDischarge(txnId, true);
+                testPeer.ExpectEnd();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                session.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestTransactionCommitFailWithEmptyRejectedDisposition()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a Declared disposition state containing the txnId.
+
+                byte[] txnId1 = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId1);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Create a producer to use in provoking creation of the AMQP transaction
+                testPeer.ExpectSenderAttach();
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Expect the message which was sent under the current transaction. Check it carries
+                // TransactionalState with the above txnId but has no outcome. Respond with a
+                // TransactionalState with Accepted outcome.
+                Action<DeliveryState> stateMatcher = state =>
+                {
+                    Assert.IsInstanceOf<TransactionalState>(state);
+                    var transactionalState = (TransactionalState) state;
+                    CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
+                    Assert.IsNull(transactionalState.Outcome);
+                };
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+                {
+                    Outcome = new Accepted(),
+                    TxnId = txnId1
+                }, responseSettled: true);
+
+                producer.Send(session.CreateMessage());
+
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with rejected and settled disposition to indicate the commit failed
+                testPeer.ExpectDischarge(txnId1, dischargeState: false, responseState: new Rejected());
+
+                // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId2 = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId2);
+
+                Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
+
+                // session should roll back on close
+                testPeer.ExpectDischarge(txnId2, true);
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestProducedMessagesAfterCommitOfSentMessagesFails()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a Declared disposition state containing the txnId.
+                byte[] txnId1 = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId1);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Create a producer to use in provoking creation of the AMQP transaction
+                testPeer.ExpectSenderAttach();
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Expect the message which was sent under the current transaction. Check it carries
+                // TransactionalState with the above txnId but has no outcome. Respond with a
+                // TransactionalState with Accepted outcome.
+                Action<DeliveryState> stateMatcher = state =>
+                {
+                    Assert.IsInstanceOf<TransactionalState>(state);
+                    var transactionalState = (TransactionalState) state;
+                    CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
+                    Assert.IsNull(transactionalState.Outcome);
+                };
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+                {
+                    Outcome = new Accepted(),
+                    TxnId = txnId1
+                }, responseSettled: true);
+
+                producer.Send(session.CreateMessage());
+
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with rejected and settled disposition to indicate the commit failed
+                testPeer.ExpectDischarge(txnId1, false, new Rejected() { Error = new Error(ErrorCode.InternalError) { Description = "Unknown error" } });
+
+                // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId2 = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId2);
+
+                Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
+
+                // Expect the message which was sent under the current transaction. Check it carries
+                // TransactionalState with the above txnId but has no outcome. Respond with a
+                // TransactionalState with Accepted outcome.
+                stateMatcher = state =>
+                {
+                    Assert.IsInstanceOf<TransactionalState>(state);
+                    var transactionalState = (TransactionalState) state;
+                    CollectionAssert.AreEqual(txnId2, transactionalState.TxnId);
+                    Assert.IsNull(transactionalState.Outcome);
+                };
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+                {
+                    Outcome = new Accepted(),
+                    TxnId = txnId2
+                }, responseSettled: true);
+                testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+                producer.Send(session.CreateMessage());
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestProducedMessagesAfterRollbackSentMessagesFails()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a Declared disposition state containing the txnId.
+                byte[] txnId1 = { 5, 6, 7, 8 };
+                byte[] txnId2 = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId1);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Create a producer to use in provoking creation of the AMQP transaction
+                testPeer.ExpectSenderAttach();
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Expect the message which was sent under the current transaction. Check it carries
+                // TransactionalState with the above txnId but has no outcome. Respond with a
+                // TransactionalState with Accepted outcome.
+
+                Action<DeliveryState> stateMatcher = state =>
+                {
+                    Assert.IsInstanceOf<TransactionalState>(state);
+                    var transactionalState = (TransactionalState) state;
+                    CollectionAssert.AreEqual(txnId1, transactionalState.TxnId);
+                    Assert.IsNull(transactionalState.Outcome);
+                };
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+                {
+                    Outcome = new Accepted(),
+                    TxnId = txnId1
+                }, responseSettled: true);
+
+                producer.Send(session.CreateMessage());
+
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with rejected and settled disposition to indicate the rollback failed
+                testPeer.ExpectDischarge(txnId1, true, new Rejected() { Error = new Error(ErrorCode.InternalError) { Description = "Unknown error" } });
+
+                // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                testPeer.ExpectDeclare(txnId2);
+
+                Assert.Catch<TransactionRolledBackException>(() => session.Rollback(), "Rollback operation should have failed.");
+
+                // Expect the message which was sent under the current transaction. Check it carries
+                // TransactionalState with the above txnId but has no outcome. Respond with a
+                // TransactionalState with Accepted outcome.
+                stateMatcher = state =>
+                {
+                    Assert.IsInstanceOf<TransactionalState>(state);
+                    var transactionalState = (TransactionalState) state;
+                    CollectionAssert.AreEqual(txnId2, transactionalState.TxnId);
+                    Assert.IsNull(transactionalState.Outcome);
+                };
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, stateMatcher: stateMatcher, responseState: new TransactionalState
+                {
+                    Outcome = new Accepted(),
+                    TxnId = txnId2
+                }, responseSettled: true);
+                testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+                producer.Send(session.CreateMessage());
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCommitTransactedSessionWithConsumerReceivingAllMessages()
+        {
+            DoCommitTransactedSessionWithConsumerTestImpl(1, 1, false, false);
+        }
+
+        [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+        public void TestCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseBefore()
+        {
+            DoCommitTransactedSessionWithConsumerTestImpl(1, 1, true, true);
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseAfter()
+        {
+            DoCommitTransactedSessionWithConsumerTestImpl(1, 1, true, false);
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCommitTransactedSessionWithConsumerReceivingSomeMessages()
+        {
+            DoCommitTransactedSessionWithConsumerTestImpl(5, 2, false, false);
+        }
+
+        [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+        public void TestCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesBefore()
+        {
+            DoCommitTransactedSessionWithConsumerTestImpl(5, 2, true, true);
+        }
+
+        [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+        public void TestCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesAfter()
+        {
+            DoCommitTransactedSessionWithConsumerTestImpl(5, 2, true, false);
+        }
+
+        private void DoCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, bool closeConsumer, bool closeBeforeCommit)
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), transferCount);
+
+                for (int i = 1; i <= consumeCount; i++)
+                {
+                    // Then expect a *settled* TransactionalState disposition for each message once received by the consumer
+                    testPeer.ExpectDisposition(settled: true, stateMatcher: state =>
+                    {
+                        Assert.IsInstanceOf<TransactionalState>(state);
+                        var transactionalState = (TransactionalState) state;
+                        Assert.AreEqual(txnId, transactionalState.TxnId);
+                        Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
+                    });
+                }
+
+                IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+
+                for (int i = 1; i <= consumeCount; i++)
+                {
+                    IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
+                    Assert.NotNull(receivedMessage);
+                    Assert.IsInstanceOf<ITextMessage>(receivedMessage);
+                }
+
+                // Expect the consumer to close now
+                if (closeConsumer && closeBeforeCommit)
+                {
+                    // Expect the client to then drain off all credit from the link.
+                    testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
+
+                    // Expect the messages that were not consumed to be released
+                    int unconsumed = transferCount - consumeCount;
+                    for (int i = 1; i <= unconsumed; i++)
+                    {
+                        testPeer.ExpectDispositionThatIsReleasedAndSettled();
+                    }
+
+                    // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                    // and reply with accepted and settled disposition to indicate the commit succeeded
+                    testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+                    // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                    // reply with a declared disposition state containing the txnId.
+                    testPeer.ExpectDeclare(txnId);
+
+                    // Now the deferred close should be performed.
+                    testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                    messageConsumer.Close();
+                }
+                else
+                {
+                    // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                    // and reply with accepted and settled disposition to indicate the commit succeeded
+                    testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+                    // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                    // reply with a declared disposition state containing the txnId.
+                    testPeer.ExpectDeclare(txnId);
+                }
+
+                session.Commit();
+
+                if (closeConsumer && !closeBeforeCommit)
+                {
+                    testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                    // Expect the messages that were not consumed to be released
+                    int unconsumed = transferCount - consumeCount;
+                    for (int i = 1; i <= unconsumed; i++)
+                    {
+                        testPeer.ExpectDispositionThatIsReleasedAndSettled();
+                    }
+
+                    messageConsumer.Close();
+                }
+
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+                testPeer.ExpectClose();
+
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerWithNoMessageCanCloseBeforeCommit()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+
+                // TODO: qpid-jms extend 2 additional flow links
+                // 1) Drain related with deferred consumer close, this feature is currently 
+                //    not implemented.
+                // 2) Consumer pull - not implemented
+                // testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
+                // testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+                Assert.IsNull(messageConsumer.ReceiveNoWait());
+
+                messageConsumer.Close();
+
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the commit succeeded
+                testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+                // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                testPeer.ExpectDeclare(txnId);
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                session.Commit();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerWithNoMessageCanCloseBeforeRollback()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+
+                // TODO: qpid-jms extend 2 additional flow links
+                // 1) Drain related with deferred consumer close, this feature is currently 
+                //    not implemented.
+                // 2) Consumer pull - not implemented
+                // testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true);
+                // testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+                Assert.IsNull(messageConsumer.ReceiveNoWait());
+
+                messageConsumer.Close();
+
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the commit succeeded
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                testPeer.ExpectDeclare(txnId);
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                session.Rollback();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestProducedMessagesOnTransactedSessionCarryTxnId()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Create a producer to use in provoking creation of the AMQP transaction
+                testPeer.ExpectSenderAttach();
+
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Expect the message which was sent under the current transaction. Check it carries
+                // TransactionalState with the above txnId but has no outcome. Respond with a
+                // TransactionalState with Accepted outcome.
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull,
+                    stateMatcher: state =>
+                    {
+                        Assert.IsInstanceOf<TransactionalState>(state);
+                        TransactionalState transactionalState = (TransactionalState) state;
+                        CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
+                        Assert.IsNull(transactionalState.Outcome);
+                    },
+                    responseState: new TransactionalState() { TxnId = txnId, Outcome = new Accepted() },
+                    responseSettled: true);
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                producer.Send(session.CreateMessage());
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestProducedMessagesOnTransactedSessionCanBeReused()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Create a producer to use in provoking creation of the AMQP transaction
+                testPeer.ExpectSenderAttach();
+
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Expect the message which was sent under the current transaction. Check it carries
+                // TransactionalState with the above txnId but has no outcome. Respond with a
+                // TransactionalState with Accepted outcome.
+                IMessage message = session.CreateMessage();
+                for (int i = 0; i < 3; i++)
+                {
+                    testPeer.ExpectTransfer(messageMatcher: Assert.NotNull,
+                        stateMatcher: state =>
+                        {
+                            Assert.IsInstanceOf<TransactionalState>(state);
+                            TransactionalState transactionalState = (TransactionalState) state;
+                            CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
+                            Assert.IsNull(transactionalState.Outcome);
+                        },
+                        responseState: new TransactionalState() { TxnId = txnId, Outcome = new Accepted() },
+                        responseSettled: true);
+
+                    message.Properties.SetInt("sequence", i);
+
+                    producer.Send(message);
+                }
+
+                // Expect rollback on close without a commit call.
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+                testPeer.ExpectClose();
+
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestRollbackTransactedSessionWithConsumerReceivingAllMessages()
+        {
+            DoRollbackTransactedSessionWithConsumerTestImpl(1, 1, false);
+        }
+
+        [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+        public void TestRollbackTransactedSessionWithConsumerReceivingAllMessagesThenCloses()
+        {
+            DoRollbackTransactedSessionWithConsumerTestImpl(1, 1, true);
+        }
+
+        [Test, Timeout(20_000), Ignore("TODO: Fix")]
+        public void TestRollbackTransactedSessionWithConsumerReceivingSomeMessages()
+        {
+            DoRollbackTransactedSessionWithConsumerTestImpl(5, 2, false);
+        }
+
+        [Test, Timeout(20_000), Ignore("Until deferred close is implemented for AmqpConsumer")]
+        public void TestRollbackTransactedSessionWithConsumerReceivingSomeMessagesThenCloses()
+        {
+            DoRollbackTransactedSessionWithConsumerTestImpl(5, 2, true);
+        }
+
+        private void DoRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, bool closeConsumer)
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), transferCount);
+
+                for (int i = 1; i <= consumeCount; i++)
+                {
+                    // Then expect a *settled* TransactionalState disposition for each message once received by the consumer
+                    testPeer.ExpectDisposition(settled: true, stateMatcher: state =>
+                    {
+                        Assert.IsInstanceOf<TransactionalState>(state);
+                        var transactionalState = (TransactionalState) state;
+                        Assert.AreEqual(txnId, transactionalState.TxnId);
+                        Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
+                    });
+                }
+
+                IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+
+                for (int i = 1; i <= consumeCount; i++)
+                {
+                    IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
+                    Assert.IsNotNull(receivedMessage);
+                    Assert.IsInstanceOf<ITextMessage>(receivedMessage);
+                }
+
+                // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
+                testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true, creditMatcher: c => Assert.AreEqual(0, c));
+
+                if (closeConsumer)
+                {
+                    // Expect the messages that were not consumed to be released
+                    int unconsumed = transferCount - consumeCount;
+                    for (int i = 1; i <= unconsumed; i++)
+                    {
+                        testPeer.ExpectDispositionThatIsReleasedAndSettled();
+                    }
+
+                    // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                    // and reply with accepted and settled disposition to indicate the commit succeeded
+                    testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+                    // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                    // reply with a declared disposition state containing the txnId.
+                    testPeer.ExpectDeclare(txnId);
+
+                    // Now the deferred close should be performed.
+                    testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                    messageConsumer.Close();
+                }
+                else
+                {
+                    // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                    // and reply with accepted and settled disposition to indicate the rollback succeeded
+                    testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                    // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                    // reply with a declared disposition state containing the txnId.
+                    testPeer.ExpectDeclare(txnId);
+
+                    // Expect the messages that were not consumed to be released
+                    int unconsumed = transferCount - consumeCount;
+                    for (int i = 1; i <= unconsumed; i++)
+                    {
+                        testPeer.ExpectDispositionThatIsReleasedAndSettled();
+                    }
+
+                    // Expect the consumer to be 'started' again as rollback completes
+                    testPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false, creditMatcher: c => Assert.Greater(c, 0));
+                }
+
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+                session.Rollback();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        // TODO: 
+        // TestRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer
+        // TestRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer
+
+        [Test, Timeout(20_000)]
+        public void TestDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                string queueName = "myQueue";
+                IQueue queue = session.GetQueue(queueName);
+
+                testPeer.ExpectReceiverAttach(linkNameMatcher: Assert.IsNotNull, targetMatcher: Assert.IsNotNull, sourceMatcher: source =>
+                {
+                    Assert.AreEqual(queueName, source.Address);
+                    Assert.IsFalse(source.Dynamic);
+                    CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_ACCEPTED);
+                    CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_REJECTED);
+                    CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_RELEASED);
+                    CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_MODIFIED);
+
+                    Assert.IsInstanceOf<Modified>(source.DefaultOutcome);
+                    Modified modified = (Modified) source.DefaultOutcome;
+                    Assert.IsTrue(modified.DeliveryFailed);
+                    Assert.IsFalse(modified.UndeliverableHere);
+                });
+
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+                session.CreateConsumer(queue);
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCoordinatorLinkSupportedOutcomes()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach(sourceMatcher: s =>
+                {
+                    Source source = (Source) s;
+                    CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_ACCEPTED);
+                    CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_REJECTED);
+                    CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_RELEASED);
+                    CollectionAssert.Contains(source.Outcomes, SymbolUtil.ATTACH_OUTCOME_MODIFIED);
+                });
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                connection.CreateSession(AcknowledgementMode.Transactional);
+
+                //Expect rollback on close
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestRollbackErrorCoordinatorClosedOnCommit()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId1 = { 5, 6, 7, 8 };
+                byte[] txnId2 = { 1, 2, 3, 4 };
+
+                testPeer.ExpectDeclare(txnId1);
+                testPeer.RemotelyCloseLastCoordinatorLinkOnDischarge(txnId: txnId1, dischargeState: false, nextTxnId: txnId2);
+                testPeer.ExpectCoordinatorAttach();
+                testPeer.ExpectDeclare(txnId2);
+                testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+                Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Transaction should have rolled back");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestRollbackErrorWhenCoordinatorRemotelyClosed()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId);
+                testPeer.RemotelyCloseLastCoordinatorLink();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectCoordinatorAttach();
+                testPeer.ExpectDeclare(txnId);
+
+                testPeer.ExpectDischarge(txnId, true);
+
+                Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Transaction should have rolled back");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestNMSErrorCoordinatorClosedOnRollback()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId1 = { 5, 6, 7, 8 };
+                byte[] txnId2 = { 1, 2, 3, 4 };
+
+                testPeer.ExpectDeclare(txnId1);
+                testPeer.RemotelyCloseLastCoordinatorLinkOnDischarge(txnId: txnId1, dischargeState: true, nextTxnId: txnId2);
+                testPeer.ExpectCoordinatorAttach();
+                testPeer.ExpectDeclare(txnId2);
+                testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+                Assert.Catch<NMSException>(() => session.Rollback(), "Rollback should have thrown a NMSException");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestNMSExceptionOnRollbackWhenCoordinatorRemotelyClosed()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId);
+                testPeer.RemotelyCloseLastCoordinatorLink();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectCoordinatorAttach();
+                testPeer.ExpectDeclare(txnId);
+
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                Assert.Catch<NMSException>(() => session.Rollback(), "Rollback should have thrown a NMSException");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendAfterCoordinatorLinkClosedDuringTX()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a Declared disposition state containing the txnId.
+                byte[] txnId = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Create a producer to use in provoking creation of the AMQP transaction
+                testPeer.ExpectSenderAttach();
+
+                // Close the link, the messages should now just get dropped on the floor.
+                testPeer.RemotelyCloseLastCoordinatorLink();
+
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                producer.Send(session.CreateMessage());
+
+                // Expect that a new link will be created in order to start the next TX.
+                txnId = new byte[] { 1, 2, 3, 4 };
+                testPeer.ExpectCoordinatorAttach();
+                testPeer.ExpectDeclare(txnId);
+
+                // Expect that the session TX will rollback on close.
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestReceiveAfterCoordinatorLinkClosedDuringTX()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a Declared disposition state containing the txnId.
+                byte[] txnId = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Create a consumer and send it an initial message for receive to process.
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithNullContent());
+
+                // Close the link, the messages should now just get dropped on the floor.
+                testPeer.RemotelyCloseLastCoordinatorLink();
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                // receiving the message would normally ack it, since the TX is failed this
+                // should not result in a disposition going out.
+                IMessage received = consumer.Receive();
+                Assert.IsNotNull(received);
+
+                // Expect that a new link will be created in order to start the next TX.
+                txnId = new byte[] { 1, 2, 3, 4 };
+                testPeer.ExpectCoordinatorAttach();
+                testPeer.ExpectDeclare(txnId);
+
+                // Expect that the session TX will rollback on close.
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                Assert.Catch<TransactionRolledBackException>(() => session.Commit(), "Commit operation should have failed.");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSessionCreateFailsOnDeclareTimeout()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+                testPeer.ExpectDeclareButDoNotRespond();
+
+                // Expect the AMQP session to be closed due to the NMS session creation failure.
+                testPeer.ExpectEnd();
+
+                // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+                Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional), "Should have timed out waiting for declare.");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSessionCreateFailsOnDeclareRejection()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a Rejected disposition state to indicate failure.
+                testPeer.ExpectDeclareAndReject();
+
+                // Expect the AMQP session to be closed due to the NMS session creation failure.
+                testPeer.ExpectEnd();
+
+                Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional));
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSessionCreateFailsOnCoordinatorLinkRefusal()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                // Expect coordinator link, refuse it, expect detach reply
+                string errorMessage = "CoordinatorLinkRefusal-breadcrumb";
+                testPeer.ExpectCoordinatorAttach(refuseLink: true, error: new Error(ErrorCode.NotImplemented) { Description = errorMessage });
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: false, replyClosed: false);
+
+                // Expect the AMQP session to be closed due to the NMS session creation failure.
+                testPeer.ExpectEnd();
+
+                NMSException exception = Assert.Catch<NMSException>(() => connection.CreateSession(AcknowledgementMode.Transactional));
+                Assert.IsTrue(exception.Message.Contains(errorMessage), "Expected exception message to contain breadcrumb");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestTransactionRolledBackOnSessionCloseTimesOut()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId);
+
+                // Closed session should roll-back the TX with a failed discharge
+                testPeer.ExpectDischargeButDoNotRespond(txnId, dischargeState: true);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+                // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+                Assert.Catch<NMSException>(() => session.Close(), "Should have timed out waiting for discharge.");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestTransactionRolledBackTimesOut()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId1 = { 5, 6, 7, 8 };
+                byte[] txnId2 = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId1);
+
+                // Expect discharge but don't respond so that the request timeout kicks in and fails
+                // the discharge.  The pipelined declare should arrive as well and be discharged as the
+                // client attempts to recover to a known good state.
+                testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: true);
+
+                // Session should throw from the rollback and then try and recover.
+                testPeer.ExpectDeclare(txnId2);
+                testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+                // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+                Assert.Catch<NMSException>(() => session.Rollback(), "Should have timed out waiting for discharge.");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestTransactionCommitTimesOut()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId1 = { 5, 6, 7, 8 };
+                byte[] txnId2 = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId1);
+
+                // Expect discharge but don't respond so that the request timeout kicks in and fails
+                // the discharge.  The pipelined declare should arrive as well and be discharged as the
+                // client attempts to recover to a known good state.
+                testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: false);
+                testPeer.ExpectDeclare(txnId2);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+                // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+                Assert.Catch<NMSException>(() => session.Commit(), "Should have timed out waiting for discharge.");
+
+                // Session rolls back on close
+                testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000), Ignore("TODO: Fix")]
+        public void TestTransactionCommitTimesOutAndNoNextBeginTimesOut()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.requestTimeout=500");
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                byte[] txnId1 = { 5, 6, 7, 8 };
+                byte[] txnId2 = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId1);
+
+                // Expect discharge and don't respond so that the request timeout kicks in
+                // Expect pipelined declare and don't response so that the request timeout kicks in.
+                // The commit operation should throw a timed out exception at that point.
+                testPeer.ExpectDischargeButDoNotRespond(txnId1, dischargeState: false);
+                testPeer.ExpectDeclareButDoNotRespond();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+
+                // After the pipelined operations both time out, the session should attempt to
+                // recover by creating a new TX, then on close the session should roll it back
+                testPeer.ExpectDeclare(txnId2);
+                testPeer.ExpectDischarge(txnId2, dischargeState: true);
+
+                // TODO: Replace NMSException with sth more specific, in qpid-jms it is JmsOperationTimedOutException
+                Assert.Catch<NMSException>(() => session.Commit(), "Should have timed out waiting for discharge.");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000), Ignore("TODO: Fix")]
+        public void TestRollbackWithNoResponseForSuspendConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+                // Then expect a *settled* TransactionalState disposition for the message once received by the consumer
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    var transactionalState = (TransactionalState) state;
+                    CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
+                    Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
+                });
+
+                // Read one so we try to suspend on rollback
+                IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+                IMessage receivedMessage = messageConsumer.Receive(TimeSpan.FromSeconds(3));
+
+                Assert.NotNull(receivedMessage);
+                Assert.IsInstanceOf<ITextMessage>(receivedMessage);
+
+                // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
+                testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: false);
+
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the rollback succeeded
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                testPeer.ExpectDeclare(txnId);
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                Assert.Catch<NMSException>(() => session.Rollback(), "Should throw a timed out exception");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerMessageOrderOnTransactedSession()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                int messageCount = 10;
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = { 5, 6, 7, 8 };
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Expect the browser enumeration to create a underlying consumer
+                testPeer.ExpectReceiverAttach();
+
+                // Expect initial credit to be sent, respond with some messages that are tagged with
+                // a sequence number we can use to determine if order is maintained.
+                testPeer.ExpectLinkFlowRespondWithTransfer(CreateMessageWithNullContent(), count: messageCount, addMessageNumberProperty: true);
+
+                for (int i = 1; i <= messageCount; i++)
+                {
+                    // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
+                    testPeer.ExpectSettledTransactionalDisposition(txnId);
+                }
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                for (int i = 0; i < messageCount; i++)
+                {
+                    IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(500));
+                    Assert.IsNotNull(message);
+                    Assert.AreEqual(i, message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER));
+                }
+
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the rollback succeeded
+                testPeer.ExpectDischarge(txnId, true);
+                testPeer.ExpectEnd();
+
+                session.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumeManyWithSingleTXPerMessage()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                int messageCount = 10;
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+
+                var txnIdQueue = new Queue<byte[]>(3);
+                txnIdQueue.Enqueue(new byte[] { 1, 2, 3, 4 });
+                txnIdQueue.Enqueue(new byte[] { 2, 4, 6, 8 });
+                txnIdQueue.Enqueue(new byte[] { 5, 4, 3, 2 });
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                byte[] txnId = txnIdQueue.Dequeue();
+                txnIdQueue.Enqueue(txnId);
+                testPeer.ExpectDeclare(txnId);
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue("myQueue");
+
+                // Expect the browser enumeration to create a underlying consumer
+                testPeer.ExpectReceiverAttach();
+
+                // Expect initial credit to be sent, respond with some messages that are tagged with
+                // a sequence number we can use to determine if order is maintained.
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithNullContent(), count: messageCount, addMessageNumberProperty: true);
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                for (int i = 0; i < messageCount; i++)
+                {
+                    // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
+                    testPeer.ExpectSettledTransactionalDisposition(txnId);
+
+                    IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(500));
+                    Assert.NotNull(message);
+                    Assert.AreEqual(i, message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER));
+
+                    // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                    // and reply with accepted and settled disposition to indicate the commit succeeded
+                    testPeer.ExpectDischarge(txnId, dischargeState: false);
+
+                    // Expect the next transaction to start.
+                    txnId = txnIdQueue.Dequeue();
+                    txnIdQueue.Enqueue(txnId);
+                    testPeer.ExpectDeclare(txnId);
+                    
+                    session.Commit();
+                }
+                
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the rollback succeeded
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+                testPeer.ExpectEnd();
+
+                session.Close();
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 71f7f02..6c2f147 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -26,6 +26,7 @@
 using Amqp;
 using Amqp.Framing;
 using Amqp.Sasl;
+using Amqp.Transactions;
 using Amqp.Types;
 using Apache.NMS.AMQP.Util;
 using NLog;
@@ -55,6 +56,7 @@
 
         private ushort lastInitiatedChannel = 0;
         private uint lastInitiatedLinkHandle;
+        private uint lastInitiatedCoordinatorLinkHandle = 0;
 
         private readonly LinkedList<IMatcher> matchers = new LinkedList<IMatcher>();
         private readonly object matchersLock = new object();
@@ -273,7 +275,7 @@
             );
         }
 
-        public void ExpectLinkFlowRespondWithTransfer(Amqp.Message message, int count = 1)
+        public void ExpectLinkFlowRespondWithTransfer(Amqp.Message message, int count = 1, bool addMessageNumberProperty = false)
         {
             Action<uint> creditMatcher = credit => Assert.Greater(credit, 0);
 
@@ -283,7 +285,7 @@
                 drain: false,
                 sendDrainFlowResponse: false,
                 sendSettled: false,
-                addMessageNumberProperty: false,
+                addMessageNumberProperty: addMessageNumberProperty,
                 creditMatcher: creditMatcher,
                 nextIncomingId: 1
             );
@@ -306,7 +308,7 @@
 
             FrameMatcher<Flow> flowMatcher = new FrameMatcher<Flow>()
                 .WithAssertion(flow => Assert.AreEqual(drain, flow.Drain))
-                .WithAssertion(flow => creditMatcher(flow.LinkCredit));
+                .WithAssertion(flow => creditMatcher?.Invoke(flow.LinkCredit));
 
             if (nextIncomingId != null)
             {
@@ -419,19 +421,20 @@
             ExpectSenderAttach(sourceMatcher: Assert.NotNull, targetMatcher: Assert.NotNull);
         }
 
-        public void ExpectSenderAttach(Action<Source> sourceMatcher,
-            Action<Target> targetMatcher,
+        public void ExpectSenderAttach(Action<object> sourceMatcher,
+            Action<object> targetMatcher,
             bool refuseLink = false,
             uint creditAmount = 100,
-            bool senderSettled = false)
+            bool senderSettled = false,
+            Error error = null)
         {
             var attachMatcher = new FrameMatcher<Attach>()
                 .WithAssertion(attach => Assert.IsNotNull(attach.LinkName))
                 .WithAssertion(attach => Assert.AreEqual(attach.Role, Role.SENDER))
                 .WithAssertion(attach => Assert.AreEqual(senderSettled ? SenderSettleMode.Settled : SenderSettleMode.Unsettled, attach.SndSettleMode))
                 .WithAssertion(attach => Assert.AreEqual(attach.RcvSettleMode, ReceiverSettleMode.First))
-                .WithAssertion(attach => sourceMatcher(attach.Source as Source))
-                .WithAssertion(attach => targetMatcher(attach.Target as Target))
+                .WithAssertion(attach => sourceMatcher(attach.Source))
+                .WithAssertion(attach => targetMatcher(attach.Target))
                 .WithOnComplete(context =>
                 {
                     var attach = new Attach()
@@ -452,11 +455,21 @@
 
                     lastInitiatedLinkHandle = context.Command.Handle;
 
+                    if (context.Command.Target is Coordinator)
+                    {
+                        lastInitiatedCoordinatorLinkHandle = context.Command.Handle;
+                    }
+
                     context.SendCommand(attach);
 
                     if (refuseLink)
                     {
                         var detach = new Detach { Closed = true, Handle = context.Command.Handle };
+                        if (error != null)
+                        {
+                            detach.Error = error;
+                        }
+
                         context.SendCommand(detach);
                     }
                     else
@@ -684,6 +697,19 @@
             AddMatcher(dispositionMatcher);
         }
 
+        public void ExpectSettledTransactionalDisposition(byte[] txnId)
+        {
+            void StateMatcher(DeliveryState state)
+            {
+                Assert.IsInstanceOf<TransactionalState>(state);
+                var transactionalState = (TransactionalState) state;
+                Assert.IsInstanceOf<Accepted>(transactionalState.Outcome);
+                CollectionAssert.AreEqual(txnId, transactionalState.TxnId);
+            }
+
+            ExpectDisposition(settled: true, StateMatcher);
+        }
+
         public void ExpectTransferButDoNotRespond(Action<Amqp.Message> messageMatcher)
         {
             ExpectTransfer(messageMatcher: messageMatcher,
@@ -705,6 +731,17 @@
             );
         }
 
+        public void ExpectTransfer(Action<Amqp.Message> messageMatcher, Action<DeliveryState> stateMatcher, DeliveryState responseState, bool responseSettled)
+        {
+            ExpectTransfer(messageMatcher: messageMatcher,
+                stateMatcher: stateMatcher,
+                settled: false,
+                sendResponseDisposition: true,
+                responseState: responseState,
+                responseSettled: responseSettled
+            );
+        }
+
         public void ExpectTransfer(Action<Amqp.Message> messageMatcher,
             Action<DeliveryState> stateMatcher,
             bool settled,
@@ -862,7 +899,124 @@
                 responseSourceOverride: responseSourceOverride,
                 desiredCapabilitiesMatcher: linkDesiredCapabilitiesMatcher);
         }
-        
+
+        public void ExpectCoordinatorAttach(Action<object> sourceMatcher = null, bool refuseLink = false, Error error = null)
+        {
+            Action<object> coordinatorMatcher = Assert.IsInstanceOf<Coordinator>;
+            sourceMatcher = sourceMatcher ?? Assert.IsNotNull;
+
+            ExpectSenderAttach(sourceMatcher: sourceMatcher, targetMatcher: coordinatorMatcher, refuseLink: refuseLink, error: error);
+        }
+
+        public void ExpectDeclare(byte[] txnId)
+        {
+            ExpectTransfer(messageMatcher: DeclareMatcher, stateMatcher: Assert.IsNull, settled: false, sendResponseDisposition: true, responseState: new Declared() { TxnId = txnId },
+                responseSettled: true);
+        }
+
+        public void ExpectDeclareButDoNotRespond()
+        {
+            ExpectTransfer(messageMatcher: DeclareMatcher, stateMatcher: Assert.IsNull, settled: false, sendResponseDisposition: false, responseState: null, responseSettled: false);
+        }
+
+
+        public void ExpectDeclareAndReject()
+        {
+            ExpectTransfer(messageMatcher: DeclareMatcher, stateMatcher: Assert.IsNull, responseState: new Rejected(), responseSettled: true);
+        }
+
+        void DeclareMatcher(Amqp.Message message)
+        {
+            Assert.IsNotNull(message);
+            Assert.IsInstanceOf<AmqpValue>(message.BodySection);
+            Assert.IsInstanceOf<Declare>(((AmqpValue) message.BodySection).Value);
+        }
+
+        public void ExpectDischarge(byte[] txnId, bool dischargeState)
+        {
+            ExpectDischarge(txnId: txnId, dischargeState: dischargeState, new Accepted());
+        }
+
+        public void ExpectDischargeButDoNotRespond(byte[] txnId, bool dischargeState)
+        {
+            ExpectTransfer(messageMatcher: m => DischargeMatcher(m, txnId, dischargeState),
+                stateMatcher: Assert.IsNull,
+                settled: false,
+                sendResponseDisposition: false,
+                responseState: null,
+                responseSettled: true);
+        }
+
+        public void ExpectDischarge(byte[] txnId, bool dischargeState, DeliveryState responseState)
+        {
+            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+            // and reply with given response and settled disposition to indicate the outcome.
+            ExpectTransfer(messageMatcher: m => DischargeMatcher(m, txnId, dischargeState),
+                stateMatcher: Assert.IsNull,
+                settled: false,
+                sendResponseDisposition: true,
+                responseState: responseState,
+                responseSettled: true);
+        }
+
+        private void DischargeMatcher(Amqp.Message message, byte[] txnId, bool dischargeState)
+        {
+            Assert.IsNotNull(message);
+            var bodySection = message.BodySection as AmqpValue;
+            Assert.IsNotNull(bodySection);
+            var discharge = bodySection.Value as Discharge;
+            Assert.AreEqual(dischargeState, discharge.Fail);
+            CollectionAssert.AreEqual(txnId, discharge.TxnId);
+        }
+
+        public void RemotelyCloseLastCoordinatorLinkOnDischarge(byte[] txnId, bool dischargeState, byte[] nextTxnId)
+        {
+            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+            // and reply with given response and settled disposition to indicate the outcome.
+            void DischargeMatcher(Amqp.Message message)
+            {
+                Assert.IsNotNull(message);
+                var bodySection = message.BodySection as AmqpValue;
+                Assert.IsNotNull(bodySection);
+                var discharge = bodySection.Value as Discharge;
+                Assert.AreEqual(dischargeState, discharge.Fail);
+                CollectionAssert.AreEqual(txnId, discharge.TxnId);
+            }
+
+            ExpectTransfer(messageMatcher: DischargeMatcher, stateMatcher: Assert.IsNull, settled: false, sendResponseDisposition: false, responseState: null, responseSettled: false);
+
+            RemotelyCloseLastCoordinatorLink(expectDetachResponse: true, closed: true, error: new Error(ErrorCode.TransactionRollback) { Description = "Discharge of TX failed." });
+        }
+
+        public void RemotelyCloseLastCoordinatorLink()
+        {
+            RemotelyCloseLastCoordinatorLink(expectDetachResponse: true, closed: true, error: new Error(ErrorCode.TransactionRollback) { Description = "Discharge of TX failed." });
+        }
+
+        private void RemotelyCloseLastCoordinatorLink(bool expectDetachResponse, bool closed, Error error)
+        {
+            lock (matchersLock)
+            {
+                var matcher = GetLastMatcher();
+                matcher.WithOnComplete(context =>
+                {
+                    var detach = new Detach { Closed = true, Handle = lastInitiatedCoordinatorLinkHandle };
+                    if (error != null)
+                    {
+                        detach.Error = error;
+                    }
+
+                    context.SendCommand(detach);
+                });
+
+                if (expectDetachResponse)
+                {
+                    var detachMatcher = new FrameMatcher<Detach>().WithAssertion(detach => Assert.AreEqual(closed, detach.Closed));
+                    AddMatcher(detachMatcher);
+                }
+            }
+        }
+
         public void PurgeExpectations()
         {
             lock (matchersLock)