Merge pull request #3 from lukeabsent/AMQNET-637

Mostly work on tests and some fixes
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index 6257177..3f7e3db 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -263,6 +263,11 @@
             }
             set
             {
+                if (!session.Connection.ConnectionInfo.DelayedDeliverySupported)
+                {
+                    throw new NotSupportedException("Delayed Delivery is not supported");
+                }
+                
                 CheckClosed();
                 deliveryDelay = value;
             }
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 1e86ced..6b40430 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -147,9 +147,9 @@
                     }
 
                     if (Array.Exists(capabilities,
-                        symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY)))
+                        symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS)))
                     {
-                        Info.DelayedDeliverySupported = true;
+                        Info.SharedSubsSupported = true;
                     }
                 }
 
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 7670d2f..d4ceeac 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -130,7 +130,7 @@
                 bool supported = false;
                 if (remoteOfferedCapabilities != null)
                 {
-                    if (Array.Exists(remoteOfferedCapabilities, symbol => symbol == SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS))
+                    if (Array.Exists(remoteOfferedCapabilities, symbol => SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS.Equals(symbol)))
                     {
                         supported = true;
                     }
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
index d5332f1..4799ada 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
@@ -269,15 +269,16 @@
                     case ulong _:
                     case int _:
                     case uint _:
-                        return new DateTime(621355968000000000L + (long) deliveryTime * 10000L, DateTimeKind.Utc);
+                        return new DateTime(621355968000000000L + Convert.ToInt64(deliveryTime) * 10000L, DateTimeKind.Utc);
                     default:
                         return syntheticDeliveryTime;
                 }
             }
             set
             {
+                // Assumption that if it is being set through property, then it is with purpose of send out this value 
                 syntheticDeliveryTime = value;
-                RemoveMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME);
+                SetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME, new DateTimeOffset(value).ToUnixTimeMilliseconds());
             }
         }
 
@@ -488,6 +489,7 @@
             target.connection = connection;
             target.consumerDestination = consumerDestination;
             target.syntheticExpiration = syntheticExpiration;
+            target.syntheticDeliveryTime = syntheticDeliveryTime;
             target.amqpTimeToLiveOverride = amqpTimeToLiveOverride;
             target.destination = destination;
             target.replyTo = replyTo;
@@ -508,7 +510,7 @@
             return MessageAnnotations != null && MessageAnnotations.Map.ContainsKey(annotationName);
         }
 
-        public void SetMessageAnnotation(Symbol symbolKeyName, string value)
+        public void SetMessageAnnotation(Symbol symbolKeyName, object value)
         {
             LazyCreateMessageAnnotations();
             MessageAnnotations.Map.Add(symbolKeyName, value);
diff --git a/src/NMS.AMQP/Util/SymbolUtil.cs b/src/NMS.AMQP/Util/SymbolUtil.cs
index bb64387..e3fe2c3 100644
--- a/src/NMS.AMQP/Util/SymbolUtil.cs
+++ b/src/NMS.AMQP/Util/SymbolUtil.cs
@@ -39,7 +39,7 @@
         public static readonly Symbol OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER = new Symbol("sole-connection-for-container");
         public static readonly Symbol OPEN_CAPABILITY_ANONYMOUS_RELAY = new Symbol("ANONYMOUS-RELAY");
         public static readonly Symbol OPEN_CAPABILITY_DELAYED_DELIVERY = new Symbol("DELAYED_DELIVERY");
-        public static readonly Symbol OPEN_CAPABILITY_SHARED_SUBS = new Symbol("SHARED_SUBS");
+        public static readonly Symbol OPEN_CAPABILITY_SHARED_SUBS = new Symbol("SHARED-SUBS");
 
         // Attach Frame 
         public readonly static Symbol ATTACH_EXPIRY_POLICY_LINK_DETACH = new Symbol("link-detach");
diff --git a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
index 2155d86..bf309c4 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
@@ -18,6 +18,7 @@
 using System;
 using Apache.NMS;
 using Apache.NMS.AMQP;
+using NMS.AMQP.Test.TestAmqp;
 using NUnit.Framework;
 
 namespace NMS.AMQP.Test
@@ -30,19 +31,32 @@
 
         protected string TestName => TestContext.CurrentContext.Test.Name;
 
+        static AmqpTestSupport()
+        {
+            Tracer.Trace = new NLogAdapter();
+        }
+        
         [TearDown]
         public void TearDown()
         {
             Connection?.Close();
         }
 
-        protected IConnection CreateAmqpConnection()
+        protected IConnection CreateAmqpConnectionStarted(string clientId = null)
+        {
+            var connection = CreateAmqpConnection(clientId);
+            connection.Start();
+            return connection;
+        }
+        
+        protected IConnection CreateAmqpConnection(string clientId = null)
         {
             string brokerUri = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_URI") ?? "amqp://127.0.0.1:5672";
             string userName = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CU") ?? "admin";
             string password = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CPWD") ?? "admin";
 
             NmsConnectionFactory factory = new NmsConnectionFactory(brokerUri);
+            factory.ClientId = clientId;
             return factory.CreateConnection(userName, password);
         }
 
@@ -108,11 +122,7 @@
             IQueue queue = session.GetQueue(TestName);
             IMessageConsumer consumer = session.CreateConsumer(queue);
 
-            IMessage message;
-            do
-            {
-                message = consumer.Receive(timeout);
-            } while (message != null);
+            PurgeConsumer(consumer, timeout);
 
             amqpConnection.Close();
         }
@@ -125,13 +135,18 @@
             ITopic queue = session.GetTopic(TestName);
             IMessageConsumer consumer = session.CreateConsumer(queue);
 
+            PurgeConsumer(consumer, timeout);
+
+            amqpConnection.Close();
+        }
+
+        protected void PurgeConsumer(IMessageConsumer consumer, TimeSpan timeout)
+        {
             IMessage message;
             do
             {
                 message = consumer.Receive(timeout);
             } while (message != null);
-
-            amqpConnection.Close();
         }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj b/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
index f7ce890..e167532 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
+++ b/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
@@ -16,11 +16,11 @@
 -->
 <Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
-    <TargetFrameworks>net462;netcoreapp2.2</TargetFrameworks>
+    <TargetFrameworks>net462;netcoreapp3.1</TargetFrameworks>
     <TargetFramework Condition="'$(AppTargetFramework)' != ''">$(AppTargetFramework)</TargetFramework>
     <RootNamespace>NMS.AMQP.Test</RootNamespace>
     <AssemblyName>NMS.AMQP.Interop.Test</AssemblyName>
-    <LangVersion>7.3</LangVersion>
+    <LangVersion>8</LangVersion>
   </PropertyGroup>
   
   <ItemGroup>
@@ -33,5 +33,12 @@
   
   <ItemGroup>
     <ProjectReference Include="..\..\src\NMS.AMQP\Apache-NMS-AMQP.csproj" />
+    <ProjectReference Include="..\Apache-NMS-AMQP-Test\Apache-NMS-AMQP-Test.csproj" />
+  </ItemGroup>
+  
+  <ItemGroup>
+    <None Update="NLog.config">
+      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+    </None>
   </ItemGroup>
 </Project>
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NLog.config b/test/Apache-NMS-AMQP-Interop-Test/NLog.config
new file mode 100644
index 0000000..c0f1581
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NLog.config
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+    <targets>
+        <target name="logconsole" xsi:type="Console" layout="${time} | ${level} | ${callsite:includeNamespace=false:methodName=false} | ${message}" />
+    </targets>
+
+    <rules>
+        <logger name="*" minlevel="Debug" writeTo="logconsole" />
+    </rules>
+</nlog>
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
index 4529783..9f174d4 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -16,6 +16,11 @@
  */
 
 using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
 using Apache.NMS;
 using NUnit.Framework;
 
@@ -84,6 +89,223 @@
             Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
         }
 
+
+        [Test, Timeout(60_000)]
+        public void TestDurableSubscription()
+        {
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            int counter = 0;
+
+
+            using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+            string subscriptionName = "mySubscriptionName";
+            ITopic topicProducer = sessionProducer.GetTopic(TestName);
+            using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+
+            // First durable consumer, reads message but does not unsubscribe
+            using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+            using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            {
+                using ITopic topic = session.GetTopic(TestName);
+                using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+                {
+                    // Purge topic
+                    PurgeConsumer(messageConsumer, TimeSpan.FromSeconds(0.5));
+
+                    ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+                    producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+                    var message = messageConsumer.Receive();
+                    Assert.AreEqual("text0", message.Body<string>());
+                }
+            }
+
+            // Write some more messages while subscription is closed
+            for (int t = 0; t < 3; t++)
+            {
+                ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+                producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+            }
+
+            // Second durable consumer, reads message that were send during no-subscription and unsubscribe
+            using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+            using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            {
+                using ITopic topic = session.GetTopic(TestName);
+                using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+                {
+                    for (int t = 1; t <= 3; t++)
+                    {
+                        var message = messageConsumer.Receive();
+                        Assert.AreEqual("text" + t, message.Body<string>());
+                    }
+
+                    // Assert topic is empty after those msgs
+                    var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
+                    Assert.IsNull(msgAtTheEnd);
+
+                    Assert.Throws<IllegalStateException>(() => session.Unsubscribe(subscriptionName)); // Error unsubscribing while consumer is on
+                }
+
+                session.Unsubscribe(subscriptionName);
+            }
+
+
+            // Send some messages again to verify we will not get them when create durable subscription
+            for (int t = 0; t < 3; t++)
+            {
+                ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+                producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+            }
+
+            // Third durable subscriber, expect NOT to read messages during no-subscription period
+            using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+            using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            {
+                using ITopic topic = session.GetTopic(TestName);
+                using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+                {
+                    // Assert topic is empty 
+                    var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
+                    Assert.IsNull(msgAtTheEnd);
+                }
+
+                // And unsubscribe again
+                session.Unsubscribe(subscriptionName);
+            }
+        }
+
+
+        [Test, Timeout(60_000)]
+        public void TestSharedSubscription()
+        {
+            IMessageConsumer GetConsumer(string subscriptionName, String clientId)
+            {
+                var connection = CreateAmqpConnectionStarted(clientId);
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                var topic = session.GetTopic(TestName);
+                var messageConsumer = session.CreateSharedConsumer(topic, subscriptionName);
+                return messageConsumer;
+            }
+
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            string subscriptionName = "mySubscriptionName";
+            
+
+            var receivedMessages = new List<int>();
+
+            var messageConsumer1 = GetConsumer(subscriptionName, null);
+            var messageConsumer2 = GetConsumer(subscriptionName, null);
+            messageConsumer1.Listener += (msg) =>
+            {
+                receivedMessages.Add(1);
+                msg.Acknowledge();
+            };
+            messageConsumer2.Listener += (msg) =>
+            {
+                receivedMessages.Add(2);
+                msg.Acknowledge();
+            };
+            
+            // Now send some messages
+            using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITopic topicProducer = sessionProducer.GetTopic(TestName);
+            using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+            for (int t = 0; t < 10; t++)
+            {
+                ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
+                producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+            }
+
+            // Give it some time to process
+            Thread.Sleep(TimeSpan.FromSeconds(2));
+            
+            // Assert message was routed to multiple consumers
+            Assert.AreEqual(2, receivedMessages.Distinct().Count());
+            Assert.AreEqual(10, receivedMessages.Count);
+        }
+        
+        [Test, Timeout(60_000)]
+        public void TestSharedDurableSubscription()
+        {
+            (IMessageConsumer,ISession,IConnection) GetConsumer(string subscriptionName, String clientId)
+            {
+                var connection = CreateAmqpConnection(clientId);
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                var topic = session.GetTopic(TestName);
+                var messageConsumer = session.CreateSharedDurableConsumer(topic, subscriptionName);
+                return (messageConsumer, session, connection);
+            }
+
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            string subscriptionName = "mySubscriptionName";
+            int messageSendCount = 1099;   
+
+            var receivedMessages = new ConcurrentBag<int>();
+
+            
+            IConnection connectionConsumer1, connectionConsumer2;
+            IMessageConsumer messageConsumer1, messageConsumer2;
+            
+            (messageConsumer1, _, connectionConsumer1) = GetConsumer(subscriptionName, null);
+            (messageConsumer2, _, connectionConsumer2) = GetConsumer(subscriptionName, null);
+            connectionConsumer1.Start();
+            connectionConsumer2.Start();
+            
+            messageConsumer1.Close();
+            messageConsumer2.Close();
+            connectionConsumer1.Close();
+            connectionConsumer2.Close();
+            
+            // Now send some messages
+            using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITopic topicProducer = sessionProducer.GetTopic(TestName);
+            using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+            for (int t = 0; t < messageSendCount; t++)
+            {
+                ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
+                producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+            }
+
+            // Create consumers again and expect messages to be delivered to them
+            ISession sessionConsumer1, sessionConsumer2;
+            (messageConsumer1, sessionConsumer1, connectionConsumer1) = GetConsumer(subscriptionName, null);
+            (messageConsumer2, sessionConsumer2, connectionConsumer2) = GetConsumer(subscriptionName, null);
+            messageConsumer1.Listener += (msg) =>
+            {
+                receivedMessages.Add(1);
+                msg.Acknowledge();
+            };
+            messageConsumer2.Listener += (msg) =>
+            {
+                receivedMessages.Add(2);
+                msg.Acknowledge();
+            };
+            Task.Run(() => connectionConsumer1.Start()); // parallel to give both consumers chance to start at the same time
+            Task.Run(() => connectionConsumer2.Start());
+            
+            // Give it some time to process
+            Thread.Sleep(TimeSpan.FromSeconds(5));
+            
+            // Assert message was routed to multiple consumers
+            Assert.AreEqual(2, receivedMessages.Distinct().Count());
+            Assert.AreEqual(messageSendCount, receivedMessages.Count);
+            
+            messageConsumer1.Close();
+            messageConsumer2.Close();
+            sessionConsumer1.Unsubscribe(subscriptionName);
+            sessionConsumer2.Unsubscribe(subscriptionName);
+            
+        }
+
+
         [Test, Timeout(60_000)]
         public void TestSelectNoLocal()
         {
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
new file mode 100644
index 0000000..5bd182d
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.NMS;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test
+{
+    [TestFixture]
+    public class NmsMessageProducerTest : AmqpTestSupport
+    {
+        [Test, Timeout(60_000)]
+        public void TestDeliveryDelay()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+
+            var deliveryDelay = TimeSpan.FromSeconds(7);
+            
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            ISession session = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.DeliveryDelay = deliveryDelay;
+
+            DateTime? receivingTime = null;
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            var receivingTask = Task.Run(() =>
+            {
+                while (true)
+                {
+                    var message = consumer.Receive(TimeSpan.FromMilliseconds(100));
+                    if (message != null && message.Body<string>() == "Hello")
+                    {
+                        receivingTime = DateTime.Now;
+                        return;
+                    }
+                }
+            });
+            
+            
+            DateTime sendTime = DateTime.Now;
+            ITextMessage message = session.CreateTextMessage("Hello");
+            producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+            // Wait that delivery delay
+            Thread.Sleep(deliveryDelay);
+
+            receivingTask.Wait(TimeSpan.FromSeconds(20)); // make sure its done
+
+            var measuredDelay = (receivingTime.Value - sendTime);
+            
+            Assert.Greater(measuredDelay.TotalMilliseconds, deliveryDelay.TotalMilliseconds* 0.5);
+            Assert.Less(measuredDelay.TotalMilliseconds, deliveryDelay.TotalMilliseconds*1.5);
+        }
+
+      
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj b/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
index 68c6937..2de2f86 100644
--- a/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
+++ b/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
@@ -16,7 +16,7 @@
 -->
 <Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
-    <TargetFrameworks>net462;netcoreapp2.2</TargetFrameworks>
+    <TargetFrameworks>net462;netcoreapp3.1</TargetFrameworks>
     <TargetFramework Condition="'$(AppTargetFramework)' != ''">$(AppTargetFramework)</TargetFramework>
     <RootNamespace>NMS.AMQP.Test</RootNamespace>
     <AssemblyName>NMS.AMQP.Test</AssemblyName>
diff --git a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
index 9a13133..fac9499 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
@@ -50,6 +50,26 @@
             return connection;
         }
 
+        protected INMSContext EstablishNMSContext(TestAmqpPeer testPeer, string optionsString = null, Symbol[] serverCapabilities = null, Fields serverProperties = null, bool setClientId = true, AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge)
+        {
+            testPeer.ExpectSaslPlain("guest", "guest");
+            testPeer.ExpectOpen(serverCapabilities: serverCapabilities, serverProperties: serverProperties);
+
+            // Each connection creates a session for managing temporary destinations etc.
+            testPeer.ExpectBegin();
+
+            var remoteUri = BuildUri(testPeer, optionsString);
+            var connectionFactory = new NmsConnectionFactory(remoteUri);
+            var context = connectionFactory.CreateContext("guest", "guest", acknowledgementMode);
+            if (setClientId)
+            {
+                // Set a clientId to provoke the actual AMQP connection process to occur.
+                context.ClientId = "ClientName";
+            }
+            
+            return context;
+        }
+        
         private static string BuildUri(TestAmqpPeer testPeer, string optionsString)
         {
             string baseUri = "amqp://127.0.0.1:" + testPeer.ServerPort.ToString();
diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs
new file mode 100644
index 0000000..dbe936a
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp.Framing;
+using Amqp.Types;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    [TestFixture]
+    public class MessageDeliveryTimeTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20000)]
+        public void TestReceiveMessageWithoutDeliveryTimeSet()
+        {
+            DoReceiveMessageDeliveryTime(null, null);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsDateTime()
+        {
+            DateTime deliveryTime = DateTimeOffset.FromUnixTimeMilliseconds(CurrentTimeInMillis() + 12345).DateTime.ToUniversalTime();
+            DoReceiveMessageDeliveryTime(deliveryTime, deliveryTime);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsULong()
+        {
+            ulong deliveryTime = (ulong) (CurrentTimeInMillis() + 12345);
+            DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds((long) deliveryTime).DateTime);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsLong()
+        {
+            long deliveryTime = (CurrentTimeInMillis() + 12345);
+            DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsInt()
+        {
+            int deliveryTime = (int) (CurrentTimeInMillis() + 12345);
+            DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsUInt()
+        {
+            uint deliveryTime = (uint) (CurrentTimeInMillis() + 12345);
+            DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+        }
+
+        private long CurrentTimeInMillis()
+        {
+            return new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
+        }
+
+        private void DoReceiveMessageDeliveryTime(object setDeliveryTimeAnnotation, DateTime? expectedDeliveryTime)
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer, "amqp.traceFrames=true");
+                connection.Start();
+                testPeer.ExpectBegin();
+                var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                var message = CreateMessageWithNullContent();
+                if (setDeliveryTimeAnnotation != null)
+                {
+                    message.MessageAnnotations = message.MessageAnnotations ?? new MessageAnnotations();
+                    message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME] = setDeliveryTimeAnnotation;
+                }
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message);
+                testPeer.ExpectDisposition(true, (deliveryState) => { });
+
+                DateTime startingTimeFrom = DateTime.UtcNow;
+                var messageConsumer = session.CreateConsumer(queue);
+                var receivedMessage = messageConsumer.Receive(TimeSpan.FromMilliseconds(3000));
+                DateTime receivingTime = DateTime.UtcNow;
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+
+                Assert.IsNotNull(receivedMessage);
+                if (expectedDeliveryTime != null)
+                {
+                    Assert.AreEqual(receivedMessage.NMSDeliveryTime, expectedDeliveryTime.Value);
+                }
+                else
+                {
+                    Assert.LessOrEqual(receivedMessage.NMSDeliveryTime, receivingTime);
+                    Assert.GreaterOrEqual(receivedMessage.NMSDeliveryTime, startingTimeFrom);
+                }
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestDeliveryDelayNotSupportedThrowsException()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = base.EstablishConnection(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+                Assert.Throws<NotSupportedException>(() => producer.DeliveryDelay = TimeSpan.FromMinutes(17));
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestDeliveryDelayHasItsReflectionInAmqpAnnotations()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                // Determine current time
+                TimeSpan deliveryDelay = TimeSpan.FromMinutes(17);
+                long currentUnixEpochTime = new DateTimeOffset(DateTime.UtcNow + deliveryDelay).ToUnixTimeMilliseconds();
+                long currentUnixEpochTime2 = new DateTimeOffset(DateTime.UtcNow + deliveryDelay + deliveryDelay).ToUnixTimeMilliseconds();
+
+                IConnection connection = base.EstablishConnection(testPeer,
+                    serverCapabilities: new Symbol[] {SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY, SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER});
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.DeliveryDelay = deliveryDelay;
+
+                // Create and transfer a new message
+                testPeer.ExpectTransfer(message =>
+                {
+                    Assert.GreaterOrEqual((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime);
+                    Assert.Less((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime2);
+
+                    Assert.IsTrue(message.Header.Durable);
+                });
+                testPeer.ExpectClose();
+
+                ITextMessage textMessage = session.CreateTextMessage();
+
+                producer.Send(textMessage);
+                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
new file mode 100644
index 0000000..c1bb33b
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
@@ -0,0 +1,975 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Amqp.Framing;
+using Apache.NMS;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    // Adapted from ConsumerIntegrationTest to use NMSContext
+    [TestFixture]
+    public class NMSConsumerIntegrationTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public void TestCloseConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var consumer = context.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        // TODO No connection Listener in context
+        // [Test, Timeout(20_000)]
+        // public void TestRemotelyCloseConsumer()
+        // {
+        //     Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+        //     string errorMessage = "buba";
+        //
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         ManualResetEvent consumerClosed = new ManualResetEvent(false);
+        //         ManualResetEvent exceptionFired = new ManualResetEvent(false);
+        //
+        //         mockConnectionListener
+        //             .Setup(listener => listener.OnConsumerClosed(It.IsAny<IMessageConsumer>(), It.IsAny<Exception>()))
+        //             .Callback(() => consumerClosed.Set());
+        //
+        //         var context = (NmsContext) EstablishNMSContext(testPeer, "amqp.traceFrames=true");
+        //         context.ConnectionInterruptedListener += () => { consumerClosed.Set(); };// AddConnectionListener(mockConnectionListener.Object);}
+        //         // context.list ConnectionInterruptedListener += () => { consumerClosed.Set(); };// AddConnectionListener(mockConnectionListener.Object);}
+        //         context.ExceptionListener += exception => { exceptionFired.Set(); };
+        //
+        //         testPeer.ExpectBegin();
+        //         // ISession session = context.CreateSession(AcknowledgementMode.AutoAcknowledge);
+        //
+        //         // Create a consumer, then remotely end it afterwards.
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlow();
+        //         testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, errorMessage: errorMessage, delayBeforeSend: 400);
+        //
+        //         IQueue queue = context.GetQueue("myQueue");
+        //         var consumer = context.CreateConsumer(queue);
+        //         
+        //         
+        //         // Verify the consumer gets marked closed
+        //         testPeer.WaitForAllMatchersToComplete(1000);
+        //
+        //         Assert.True(consumerClosed.WaitOne(2000), "Consumer closed callback didn't trigger");
+        //         Assert.False(exceptionFired.WaitOne(20), "Exception listener shouldn't fire with no MessageListener");
+        //
+        //         // Try closing it explicitly, should effectively no-op in client.
+        //         // The test peer will throw during close if it sends anything.
+        //         consumer.Close();
+        //     }
+        // }
+
+        // [Test, Timeout(20_000)]
+        // public void TestRemotelyCloseConsumerWithMessageListenerFiresExceptionListener()
+        // {
+        //     Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+        //     string errorMessage = "buba";
+        //
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         ManualResetEvent consumerClosed = new ManualResetEvent(false);
+        //         ManualResetEvent exceptionFired = new ManualResetEvent(false);
+        //
+        //         mockConnectionListener
+        //             .Setup(listener => listener.OnConsumerClosed(It.IsAny<IMessageConsumer>(), It.IsAny<Exception>()))
+        //             .Callback(() => consumerClosed.Set());
+        //
+        //         NmsConnection connection = (NmsConnection) EstablishConnection(testPeer);
+        //         connection.AddConnectionListener(mockConnectionListener.Object);
+        //         connection.ExceptionListener += exception => { exceptionFired.Set(); };
+        //
+        //         testPeer.ExpectBegin();
+        //         ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+        //
+        //         // Create a consumer, then remotely end it afterwards.
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlow();
+        //         testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, errorMessage: errorMessage, 10);
+        //
+        //         IQueue queue = session.GetQueue("myQueue");
+        //         IMessageConsumer consumer = session.CreateConsumer(queue);
+        //
+        //         consumer.Listener += message => { };
+        //
+        //         // Verify the consumer gets marked closed
+        //         testPeer.WaitForAllMatchersToComplete(1000);
+        //
+        //         Assert.True(consumerClosed.WaitOne(2000), "Consumer closed callback didn't trigger");
+        //         Assert.True(exceptionFired.WaitOne(2000), "Exception listener should have fired with a MessageListener");
+        //
+        //         // Try closing it explicitly, should effectively no-op in client.
+        //         // The test peer will throw during close if it sends anything.
+        //         consumer.Close();
+        //     }
+        // }
+
+        [Test, Timeout(20_000)]
+        public void TestReceiveMessageWithReceiveZeroTimeout()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1);
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                var consumer = context.CreateConsumer(queue);
+                IMessage message = consumer.Receive();
+                Assert.NotNull(message, "A message should have been received");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(10000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestExceptionInOnMessageReleasesInAutoAckMode()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1);
+                testPeer.ExpectDispositionThatIsReleasedAndSettled();
+
+                var consumer = context.CreateConsumer(queue);
+                consumer.Listener += message => throw new Exception();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(10000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCloseDurableTopicSubscriberDetachesWithCloseFalse()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                
+                string topicName = "myTopic";
+                string subscriptionName = "mySubscription";
+                ITopic topic = context.GetTopic(topicName);
+
+                testPeer.ExpectDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+
+                var durableConsumer = context.CreateDurableConsumer(topic, subscriptionName, null, false);
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, replyClosed: false);
+                durableConsumer.Close();
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerReceiveThrowsIfConnectionLost()
+        {
+            DoTestConsumerReceiveThrowsIfConnectionLost(false);
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerTimedReceiveThrowsIfConnectionLost()
+        {
+            DoTestConsumerReceiveThrowsIfConnectionLost(true);
+        }
+
+        private void DoTestConsumerReceiveThrowsIfConnectionLost(bool useTimeout)
+        {
+            ManualResetEvent consumerReady = new ManualResetEvent(false);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("queue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.RunAfterLastHandler(() => { consumerReady.WaitOne(2000); });
+                testPeer.DropAfterLastMatcher(delay: 10);
+
+                var consumer = context.CreateConsumer(queue);
+                consumerReady.Set();
+
+                try
+                {
+                    if (useTimeout)
+                    {
+                        consumer.Receive(TimeSpan.FromMilliseconds(10_0000));
+                    }
+                    else
+                    {
+                        consumer.Receive();
+                    }
+
+
+                    Assert.Fail("An exception should have been thrown");
+                }
+                catch (NMSException)
+                {
+                    // Expected
+                }
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        //  TODO No connection Listener in context
+        // [Test, Timeout(20_000)]
+        // public void TestConsumerReceiveNoWaitThrowsIfConnectionLost()
+        // {
+        //     ManualResetEvent disconnected = new ManualResetEvent(false);
+        //
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         NmsContext context = (NmsContext) EstablishNMSContext(testPeer);
+        //         Mock<INmsConnectionListener> connectionListener = new Mock<INmsConnectionListener>();
+        //
+        //         connectionListener
+        //             .Setup(listener => listener.OnConnectionFailure(It.IsAny<NMSException>()))
+        //             .Callback(() => { disconnected.Set(); });
+        //
+        //         context.AddConnectionListener(connectionListener.Object);
+        //
+        //         context.Start();
+        //
+        //         testPeer.ExpectBegin();
+        //
+        //         IQueue queue = context.GetQueue("queue");
+        //
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlow();
+        //         testPeer.RemotelyCloseConnection(expectCloseResponse: true, errorCondition: ConnectionError.CONNECTION_FORCED, errorMessage: "buba");
+        //
+        //         var consumer = context.CreateConsumer(queue);
+        //
+        //         Assert.True(disconnected.WaitOne(), "Connection should be disconnected");
+        //
+        //         try
+        //         {
+        //             consumer.ReceiveNoWait();
+        //             Assert.Fail("An exception should have been thrown");
+        //         }
+        //         catch (NMSException)
+        //         {
+        //             // Expected
+        //         }
+        //     }
+        // }
+
+        [Test, Timeout(20_000)]
+        public void TestSetMessageListenerAfterStartAndSend()
+        {
+            int messageCount = 4;
+            CountdownEvent latch = new CountdownEvent(messageCount);
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), messageCount);
+
+                var consumer = context.CreateConsumer(destination);
+
+                for (int i = 0; i < messageCount; i++)
+                {
+                    testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+                }
+
+                consumer.Listener += message => latch.Signal();
+
+                Assert.True(latch.Wait(4000), "Messages not received within given timeout. Count remaining: " + latch.CurrentCount);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        // TODO Connection is started anyway when creating consumer
+        // [Test, Timeout(20_000)]
+        // public void TestNoReceivedMessagesWhenConnectionNotStarted()
+        // {
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         var context = EstablishNMSContext(testPeer);
+        //         
+        //         testPeer.ExpectBegin();
+        //
+        //         IQueue destination = context.GetQueue("myQueue");
+        //
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 3);
+        //
+        //         // cREATING CONSUMER STARTS CONNECTION
+        //         var consumer = context.CreateConsumer(destination);
+        //
+        //         // Wait for a message to arrive then try and receive it, which should not happen
+        //         // since the connection is not started.
+        //         Assert.Null(consumer.Receive(TimeSpan.FromMilliseconds(100)));
+        //
+        //         testPeer.ExpectEnd();
+        //         testPeer.ExpectClose();
+        //         context.Close();
+        //
+        //         testPeer.WaitForAllMatchersToComplete(2000);
+        //     }
+        // }
+
+        // TODO Connection is started anyway when creating consumer
+        // [Test, Timeout(20_000)]
+        // public void TestNoReceivedNoWaitMessagesWhenConnectionNotStarted()
+        // {
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         var context = EstablishNMSContext(testPeer);
+        //
+        //         testPeer.ExpectBegin();
+        //
+        //         IQueue destination = context.GetQueue("myQueue");
+        //
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 3);
+        //
+        //         var consumer = context.CreateConsumer(destination);
+        //
+        //         // Wait for a message to arrive then try and receive it, which should not happen
+        //         // since the connection is not started.
+        //         Assert.Null(consumer.ReceiveNoWait());
+        //
+        //         testPeer.ExpectEnd();
+        //         testPeer.ExpectClose();
+        //         context.Close();
+        //
+        //         testPeer.WaitForAllMatchersToComplete(2000);
+        //     }
+        // }
+
+        [Test, Timeout(20_000)]
+        public void TestSyncReceiveFailsWhenListenerSet()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+
+                var consumer = context.CreateConsumer(destination);
+
+                consumer.Listener += message => { };
+
+                Assert.Catch<NMSException>(() => consumer.Receive(), "Should have thrown an exception.");
+                Assert.Catch<NMSException>(() => consumer.Receive(TimeSpan.FromMilliseconds(1000)), "Should have thrown an exception.");
+                Assert.Catch<NMSException>(() => consumer.ReceiveNoWait(), "Should have thrown an exception.");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateProducerInOnMessage()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                IQueue outbound = context.GetQueue("ForwardDest");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                testPeer.ExpectSenderAttach();
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull);
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                var consumer = context.CreateConsumer(destination);
+
+                consumer.Listener += message =>
+                {
+                    var producer = context.CreateProducer();
+                    producer.Send(outbound, message);
+                    producer.Close();
+                };
+
+                testPeer.WaitForAllMatchersToComplete(10_000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageListenerCallsConnectionCloseThrowsIllegalStateException()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                ManualResetEvent latch = new ManualResetEvent(false);
+                Exception exception = null;
+                consumer.Listener += message =>
+                {
+                    try
+                    {
+                        context.Close();
+                    }
+                    catch (Exception e)
+                    {
+                        exception = e;
+                    }
+
+                    latch.Set();
+                };
+
+                Assert.True(latch.WaitOne(4000), "Messages not received within given timeout.");
+                Assert.IsNotNull(exception);
+                Assert.IsInstanceOf<IllegalStateException>(exception);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                // testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageListenerCallsConnectionStopThrowsIllegalStateException()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                ManualResetEvent latch = new ManualResetEvent(false);
+                Exception exception = null;
+                consumer.Listener += message =>
+                {
+                    try
+                    {
+                        context.Stop();
+                    }
+                    catch (Exception e)
+                    {
+                        exception = e;
+                    }
+
+                    latch.Set();
+                };
+
+                Assert.True(latch.WaitOne(3000), "Messages not received within given timeout.");
+                Assert.IsNotNull(exception);
+                Assert.IsInstanceOf<IllegalStateException>(exception);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageListenerCallsSessionCloseThrowsIllegalStateException()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                ManualResetEvent latch = new ManualResetEvent(false);
+                Exception exception = null;
+                consumer.Listener += message =>
+                {
+                    try
+                    {
+                        context.Close();
+                    }
+                    catch (Exception e)
+                    {
+                        exception = e;
+                    }
+
+                    latch.Set();
+                };
+
+                Assert.True(latch.WaitOne(3000), "Messages not received within given timeout.");
+                Assert.IsNotNull(exception);
+                Assert.IsInstanceOf<IllegalStateException>(exception);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                // testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        // TODO: To be fixed
+        [Test, Timeout(20_000), Ignore("Ignore")]
+        public void TestMessageListenerClosesItsConsumer()
+        {
+            var latch = new ManualResetEvent(false);
+            var exceptionListenerFired = new ManualResetEvent(false);
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                context.ExceptionListener += _ => exceptionListenerFired.Set();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true, creditMatcher: credit => Assert.AreEqual(99, credit)); // Not sure if expected credit is right
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                Exception exception = null;
+                consumer.Listener += message =>
+                {
+                    try
+                    {
+                        consumer.Close();
+                        latch.Set();
+                    }
+                    catch (Exception e)
+                    {
+                        exception = e;
+                    }
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(1000)), "Process not completed within given timeout");
+                Assert.IsNull(exception, "No error expected during close");
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                Assert.False(exceptionListenerFired.WaitOne(20), "Exception listener shouldn't have fired");
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestRecoverOrderingWithAsyncConsumer()
+        {
+            ManualResetEvent latch = new ManualResetEvent(false);
+            Exception asyncError = null;
+
+            int recoverCount = 5;
+            int messageCount = 8;
+            int testPayloadLength = 255;
+            string payload = Encoding.UTF8.GetString(new byte[testPayloadLength]);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer, acknowledgementMode:AcknowledgementMode.ClientAcknowledge);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: new Amqp.Message() { BodySection = new AmqpValue() { Value = payload } },
+                    count: messageCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, messageCount)
+                );
+
+                var consumer = context.CreateConsumer(destination);
+                
+                bool complete = false;
+                int messageSeen = 0;
+                int expectedIndex = 0;
+                consumer.Listener += message =>
+                {
+                    if (complete)
+                    {
+                        return;
+                    }
+
+                    try
+                    {
+                        int actualIndex = message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER);
+                        Assert.AreEqual(expectedIndex, actualIndex, "Received Message Out Of Order");
+
+                        // don't ack the message until we receive it X times
+                        if (messageSeen < recoverCount)
+                        {
+                            context.Recover();
+                            messageSeen++;
+                        }
+                        else
+                        {
+                            messageSeen = 0;
+                            expectedIndex++;
+
+                            // Have the peer expect the accept the disposition (1-based, hence pre-incremented).
+                            testPeer.ExpectDisposition(settled: true,
+                                stateMatcher: state => Assert.AreEqual(state.Descriptor.Code, MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code
+                                ));
+
+                            message.Acknowledge();
+
+                            if (expectedIndex == messageCount)
+                            {
+                                complete = true;
+                                latch.Set();
+                            }
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        complete = true;
+                        asyncError = e;
+                        latch.Set();
+                    }
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromSeconds(15)), "Messages not received within given timeout.");
+                Assert.IsNull(asyncError, "Unexpected exception");
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCloseWaitsForAsyncDeliveryToComplete()
+        {
+            ManualResetEvent latch = new ManualResetEvent(false);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                consumer.Listener += _ =>
+                {
+                    latch.Set();
+                    Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSessionCloseWaitsForAsyncDeliveryToComplete()
+        {
+            ManualResetEvent latch = new ManualResetEvent(false);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                consumer.Listener += _ =>
+                {
+                    latch.Set();
+                    Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+                
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConnectionCloseWaitsForAsyncDeliveryToComplete()
+        {
+            ManualResetEvent latch = new ManualResetEvent(false);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                consumer.Listener += _ =>
+                {
+                    latch.Set();
+                    Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestRecoveredMessageShouldNotBeMutated()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer, acknowledgementMode:AcknowledgementMode.ClientAcknowledge);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                IQueue destination = context.GetQueue("myQueue");
+                string originalPayload = "testMessage";
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue() { Value = originalPayload } }, count: 1);
+
+                var consumer = context.CreateConsumer(destination);
+                NmsTextMessage message = consumer.Receive() as NmsTextMessage;
+                Assert.NotNull(message);
+                message.IsReadOnlyBody = false;
+                message.Text = message.Text + "Received";
+                context.Recover();
+
+                ITextMessage recoveredMessage = consumer.Receive() as ITextMessage;
+                Assert.IsNotNull(recoveredMessage);
+                Assert.AreNotEqual(message.Text, recoveredMessage.Text);
+                Assert.AreEqual(originalPayload, recoveredMessage.Text);
+                Assert.AreNotSame(message, recoveredMessage);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs
new file mode 100644
index 0000000..a1b43e8
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    // Adapted from SessionIntegrationTest to use NMSContext
+    [TestFixture]
+    public class NMSContextIntegrationTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public void TestClose()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectClose();
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateProducer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+
+                testPeer.ExpectSenderAttach();
+
+                var producer = context.CreateProducer();
+
+                testPeer.ExpectDetach(true, true, true);
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                producer.Close();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                var consumer = context.CreateConsumer(context.GetQueue("myQueue"));
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateConsumerWithEmptySelector()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                IQueue queue = context.GetQueue("myQueue");
+                context.CreateConsumer(queue, "");
+                context.CreateConsumer(queue, "", noLocal: false);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateConsumerWithNullSelector()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                IQueue queue = context.GetQueue("myQueue");
+                context.CreateConsumer(queue, null);
+                context.CreateConsumer(queue, null, noLocal: false);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateDurableConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                string topicName = "myTopic";
+                ITopic topic = context.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+
+                var durableConsumer = context.CreateDurableConsumer(topic, subscriptionName, null, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+
+        [Test, Timeout(20_000)]
+        public void TestCreateTemporaryQueue()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+
+                string dynamicAddress = "myTempQueueAddress";
+                testPeer.ExpectTempQueueCreationAttach(dynamicAddress);
+
+                ITemporaryQueue temporaryQueue = context.CreateTemporaryQueue();
+                Assert.NotNull(temporaryQueue, "TemporaryQueue object was null");
+                Assert.NotNull(temporaryQueue.QueueName, "TemporaryQueue queue name was null");
+                Assert.AreEqual(dynamicAddress, temporaryQueue.QueueName, "TemporaryQueue name not as expected");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateTemporaryTopic()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+
+                string dynamicAddress = "myTempTopicAddress";
+                testPeer.ExpectTempTopicCreationAttach(dynamicAddress);
+
+                ITemporaryTopic temporaryTopic = context.CreateTemporaryTopic();
+                Assert.NotNull(temporaryTopic, "TemporaryTopic object was null");
+                Assert.NotNull(temporaryTopic.TopicName, "TemporaryTopic name was null");
+                Assert.AreEqual(dynamicAddress, temporaryTopic.TopicName, "TemporaryTopic name not as expected");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateSharedConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                string topicName = "myTopic";
+                ITopic topic = context.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectSharedSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+
+                var durableConsumer = context.CreateSharedConsumer(topic, subscriptionName, null); //, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(20000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateSharedDurableConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                string topicName = "myTopic";
+                ITopic topic = context.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectSharedDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+
+                var durableConsumer = context.CreateSharedDurableConsumer(topic, subscriptionName, null); //, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs
new file mode 100644
index 0000000..22563a0
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp.Framing;
+using Amqp.Types;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    // Adapted from ProducerIntegrationTest to use NMSContext
+    [TestFixture]
+    public class NMSProducerIntegrationTest : IntegrationTestFixture
+    {
+        private const long TICKS_PER_MILLISECOND = 10000;
+
+        [Test, Timeout(20_000)]
+        public void TestCloseSender()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = base.EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                producer.Close();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSentTextMessageCanBeModified()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = base.EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                // Create and transfer a new message
+                String text = "myMessage";
+                testPeer.ExpectTransfer(x => Assert.AreEqual(text, (x.BodySection as AmqpValue).Value));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage(text);
+                producer.Send(queue, message);
+
+                Assert.AreEqual(text, message.Text);
+                message.Text = text + text;
+                Assert.AreEqual(text + text, message.Text);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestDefaultDeliveryModeProducesDurableMessages()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = base.EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                // Create and transfer a new message
+                testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage textMessage = context.CreateTextMessage();
+
+                producer.Send(queue, textMessage);
+                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestProducerOverridesMessageDeliveryMode()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = base.EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                // Create and transfer a new message, explicitly setting the deliveryMode on the
+                // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+                // that the producer ignores this value and sends the message as PERSISTENT(/durable)
+                testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage textMessage = context.CreateTextMessage();
+                textMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
+                Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode);
+
+                producer.Send(queue, textMessage);
+
+                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+                context.Close();
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageNonPersistentProducerSetDurableFalse()
+        {
+            DoSendingMessageNonPersistentTestImpl(true);
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageNonPersistentProducerOmitsHeader()
+        {
+            DoSendingMessageNonPersistentTestImpl(false);
+        }
+
+        private void DoSendingMessageNonPersistentTestImpl(bool setPriority)
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                //Add capability to indicate support for ANONYMOUS-RELAY
+                Symbol[] serverCapabilities = {SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY};
+                var context = EstablishNMSContext(testPeer, serverCapabilities: serverCapabilities);
+                testPeer.ExpectBegin();
+
+                string queueName = "myQueue";
+                Action<object> targetMatcher = t =>
+                {
+                    var target = t as Target;
+                    Assert.IsNotNull(target);
+                };
+
+
+                testPeer.ExpectSenderAttach(targetMatcher: targetMatcher, sourceMatcher: Assert.NotNull, senderSettled: false);
+
+                IQueue queue = context.GetQueue(queueName);
+                INMSProducer producer = context.CreateProducer();
+
+                byte priority = 5;
+                String text = "myMessage";
+                testPeer.ExpectTransfer(messageMatcher: message =>
+                    {
+                        if (setPriority)
+                        {
+                            Assert.IsFalse(message.Header.Durable);
+                            Assert.AreEqual(priority, message.Header.Priority);
+                        }
+
+                        Assert.AreEqual(text, (message.BodySection as AmqpValue).Value);
+                    }, stateMatcher: Assert.IsNull,
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true);
+
+                ITextMessage textMessage = context.CreateTextMessage(text);
+
+                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+                if (setPriority)
+                    producer.Priority = (MsgPriority) priority;
+
+                producer.Send(queue, textMessage);
+
+                Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode, "Should have NonPersistent delivery mode set");
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageSetsNMSDestination()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                string text = "myMessage";
+                ITextMessage message = context.CreateTextMessage(text);
+
+                testPeer.ExpectTransfer(m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                Assert.IsNull(message.NMSDestination, "Should not yet have a NMSDestination");
+
+                producer.Send(destination, message);
+
+                Assert.AreEqual(destination, message.NMSDestination, "Should have had NMSDestination set");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageSetsNMSTimestamp()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                // Create matcher to expect the absolute-expiry-time field of the properties section to
+                // be set to a value greater than 'now'+ttl, within a delta.
+
+                DateTime creationLower = DateTime.UtcNow;
+                DateTime creationUpper = creationLower + TimeSpan.FromMilliseconds(3000);
+
+                var text = "myMessage";
+                testPeer.ExpectTransfer(m =>
+                {
+                    Assert.IsTrue(m.Header.Durable);
+                    Assert.That(m.Properties.CreationTime.Ticks, Is.GreaterThanOrEqualTo(creationLower.Ticks).Within(TICKS_PER_MILLISECOND));
+                    Assert.That(m.Properties.CreationTime.Ticks, Is.LessThanOrEqualTo(creationUpper.Ticks).Within(TICKS_PER_MILLISECOND));
+                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+                });
+
+                ITextMessage message = context.CreateTextMessage(text);
+                producer.Send(destination, message);
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageSetsNMSExpirationRelatedAbsoluteExpiryAndTtlFields()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                uint ttl = 100_000;
+                DateTime currentTime = DateTime.UtcNow;
+                DateTime expirationLower = currentTime + TimeSpan.FromMilliseconds(ttl);
+                DateTime expirationUpper = currentTime + TimeSpan.FromMilliseconds(ttl) + TimeSpan.FromMilliseconds(5000);
+
+                // Create matcher to expect the absolute-expiry-time field of the properties section to
+                // be set to a value greater than 'now'+ttl, within a delta.
+                string text = "myMessage";
+                testPeer.ExpectTransfer(m =>
+                {
+                    Assert.IsTrue(m.Header.Durable);
+                    Assert.AreEqual(ttl, m.Header.Ttl);
+                    Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.GreaterThanOrEqualTo(expirationLower.Ticks).Within(TICKS_PER_MILLISECOND));
+                    Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.LessThanOrEqualTo(expirationUpper.Ticks).Within(TICKS_PER_MILLISECOND));
+                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+                });
+
+                ITextMessage message = context.CreateTextMessage(text);
+                producer.TimeToLive = TimeSpan.FromMilliseconds(ttl);
+                producer.Priority = NMSConstants.defaultPriority;
+                producer.DeliveryMode = NMSConstants.defaultDeliveryMode;
+                producer.Send(destination, message);
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessagesAreProducedWithProperDefaultPriorityWhenNoPrioritySpecified()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                byte priority = 4;
+
+                testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage();
+                Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority);
+
+                producer.Send(destination, message);
+
+                Assert.AreEqual((MsgPriority) priority, message.NMSPriority);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestNonDefaultPriorityProducesMessagesWithPriorityFieldAndSetsNMSPriority()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                byte priority = 9;
+
+                testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage();
+                Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority);
+
+                producer.DeliveryMode = MsgDeliveryMode.Persistent;
+                producer.Priority = (MsgPriority) priority;
+                producer.TimeToLive = NMSConstants.defaultTimeToLive;
+                producer.Send(destination, message);
+
+                Assert.AreEqual((MsgPriority) priority, message.NMSPriority);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageSetsNMSMessageId()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                string text = "myMessage";
+                string actualMessageId = null;
+                testPeer.ExpectTransfer(m =>
+                {
+                    Assert.IsTrue(m.Header.Durable);
+                    Assert.IsNotEmpty(m.Properties.MessageId);
+                    actualMessageId = m.Properties.MessageId;
+                });
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage(text);
+                Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
+
+                producer.Send(destination, message);
+
+                Assert.IsNotNull(message.NMSMessageId);
+                Assert.IsNotEmpty(message.NMSMessageId, "NMSMessageId should be set");
+                Assert.IsTrue(message.NMSMessageId.StartsWith("ID:"), "MMS 'ID:' prefix not found");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+                // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
+                Assert.AreEqual(message.NMSMessageId, actualMessageId, "Expected NMSMessageId value to be present in AMQP message");
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageWithDisableMessageIdHint()
+        {
+            DoSendingMessageWithDisableMessageIdHintTestImpl(false);
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageWithDisableMessageIdHintAndExistingMessageId()
+        {
+            DoSendingMessageWithDisableMessageIdHintTestImpl(true);
+        }
+
+        private void DoSendingMessageWithDisableMessageIdHintTestImpl(bool existingId)
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                string text = "myMessage";
+                testPeer.ExpectTransfer(m =>
+                {
+                    Assert.IsTrue(m.Header.Durable);
+                    Assert.IsNull(m.Properties.MessageId); // Check there is no message-id value;
+                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+                });
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage(text);
+
+                Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
+
+                if (existingId)
+                {
+                    string existingMessageId = "ID:this-should-be-overwritten-in-send";
+                    message.NMSMessageId = existingMessageId;
+                    Assert.AreEqual(existingMessageId, message.NMSMessageId, "NMSMessageId should now be se");
+                }
+
+                producer.DisableMessageID = true;
+
+                producer.Send(destination, message);
+
+                Assert.IsNull(message.NMSMessageId, "NMSMessageID should be null");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        // TODO No connection listener in nms context
+        // [Test, Timeout(20_000)]
+        // public void TestRemotelyCloseProducer()
+        // {
+        //     string breadCrumb = "ErrorMessageBreadCrumb";
+        //
+        //     ManualResetEvent producerClosed = new ManualResetEvent(false);
+        //     Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+        //     mockConnectionListener
+        //         .Setup(listener => listener.OnProducerClosed(It.IsAny<NmsMessageProducer>(), It.IsAny<Exception>()))
+        //         .Callback(() => { producerClosed.Set(); });
+        //
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         NmsContext context = (NmsContext) EstablishNMSContext(testPeer);
+        //         context.AddConnectionListener(mockConnectionListener.Object);
+        //
+        //         testPeer.ExpectBegin();
+        //         ISession session = context.CreateSession(AcknowledgementMode.AutoAcknowledge);
+        //
+        //         // Create a producer, then remotely end it afterwards.
+        //         testPeer.ExpectSenderAttach();
+        //         testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, breadCrumb, delayBeforeSend: 10);
+        //
+        //         IQueue destination = session.GetQueue("myQueue");
+        //         IMessageProducer producer = session.CreateProducer(destination);
+        //
+        //         // Verify the producer gets marked closed
+        //         testPeer.WaitForAllMatchersToComplete(1000);
+        //
+        //         Assert.True(producerClosed.WaitOne(TimeSpan.FromMilliseconds(1000)), "Producer closed callback didn't trigger");
+        //         Assert.That(() => producer.DisableMessageID, Throws.Exception.InstanceOf<IllegalStateException>(), "Producer never closed");
+        //
+        //         // Try closing it explicitly, should effectively no-op in client.
+        //         // The test peer will throw during close if it sends anything.
+        //         producer.Close();
+        //     }
+        // }
+
+        [Test, Timeout(20_000)]
+        public void TestSendWhenLinkCreditIsZeroAndTimeout()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer, optionsString: "nms.sendTimeout=500");
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("myQueue");
+
+                ITextMessage message = context.CreateTextMessage("text");
+
+                // Expect the producer to attach. Don't send any credit so that the client will
+                // block on a send and we can test our timeouts.
+                testPeer.ExpectSenderAttachWithoutGrantingCredit();
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                var producer = context.CreateProducer();
+
+                Assert.Catch<Exception>(() => producer.Send(queue, message), "Send should time out.");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendTimesOutWhenNoDispositionArrives()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer, optionsString: "nms.sendTimeout=500");
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("myQueue");
+
+                ITextMessage message = context.CreateTextMessage("text");
+
+                // Expect the producer to attach and grant it some credit, it should send
+                // a transfer which we will not send any response for which should cause the
+                // send operation to time out.
+                testPeer.ExpectSenderAttach();
+                testPeer.ExpectTransferButDoNotRespond(messageMatcher: Assert.NotNull);
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                var producer = context.CreateProducer();
+
+                Assert.Catch<Exception>(() => producer.Send(queue, message), "Send should time out.");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendWorksWhenConnectionNotStarted()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                testPeer.ExpectTransfer(Assert.IsNotNull);
+
+                producer.Send(destination, context.CreateMessage());
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                producer.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendWorksAfterConnectionStopped()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                testPeer.ExpectTransfer(Assert.IsNotNull);
+
+                context.Stop();
+
+                producer.Send(destination, context.CreateMessage());
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                producer.Close();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessagePersistentSetsBatchableFalse()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+                testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+                    stateMatcher: Assert.IsNull,
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    batchable: false);
+
+                IMessage message = context.CreateMessage();
+                producer.DeliveryMode = MsgDeliveryMode.Persistent;
+                producer.Priority = MsgPriority.Normal;
+                producer.TimeToLive = NMSConstants.defaultTimeToLive;
+                producer.Send(destination, message);
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageNonPersistentSetsBatchableFalse()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+                testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+                    stateMatcher: Assert.IsNull,
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    batchable: false);
+
+                IMessage message = context.CreateMessage();
+                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+                producer.Priority = MsgPriority.Normal;
+                producer.TimeToLive = NMSConstants.defaultTimeToLive;
+                producer.Send(destination, message);
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs
new file mode 100644
index 0000000..88d05df
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading.Tasks;
+using Amqp.Framing;
+using Apache.NMS;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    [TestFixture]
+    public class ProducerIntegrationAsyncTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public void TestSentAsyncIsAsynchronous()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = base.EstablishConnection(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Create and transfer a new message
+                String text = "myMessage";
+                testPeer.ExpectTransfer(messageMatcher: m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value),
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    stateMatcher: Assert.IsNull,
+                    dispositionDelay: 10); // 10ms should be enough
+                testPeer.ExpectClose();
+
+                ITextMessage message = session.CreateTextMessage(text);
+                var sendTask = producer.SendAsync(message);
+                // Instantly check if its not completed yet, we want async, so it should not be completed right after 
+                Assert.AreEqual(false, sendTask.IsCompleted);
+                
+                // And now wait for task to complete
+                sendTask.Wait(20_000);
+
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public async Task TestSentAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = base.EstablishConnection(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Create and transfer a new message
+                String text = "myMessage";
+                testPeer.ExpectTransfer(messageMatcher: m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value),
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    stateMatcher: Assert.IsNull,
+                    dispositionDelay: 10); // 10ms should be enough
+                testPeer.ExpectClose();
+
+                ITextMessage message = session.CreateTextMessage(text);
+                await producer.SendAsync(message);
+              
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+       
+
+        [Test, Timeout(20_000)]
+        public async Task TestProducerWorkWithAsyncAwait()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = base.EstablishConnection(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Create and transfer a new message, explicitly setting the deliveryMode on the
+                // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+                // that the producer ignores this value and sends the message as PERSISTENT(/durable)
+                testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+                testPeer.ExpectClose();
+
+                ITextMessage textMessage = session.CreateTextMessage();
+                textMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
+                Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode);
+
+                await producer.SendAsync(textMessage);
+
+                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+                connection.Close();
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
index 4fbdc29..da336e8 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
@@ -227,5 +227,62 @@
                 testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
+       
+        [Test, Timeout(20_000)]
+        public void TestCreateSharedConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                
+                string topicName = "myTopic";
+                ITopic topic = session.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectSharedSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+                
+                IMessageConsumer durableConsumer = session.CreateSharedConsumer(topic, subscriptionName, null);//, false);
+                // IMessageConsumer durableConsumer = session.CreateDurableConsumer(topic, subscriptionName, null, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(20000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestCreateSharedDurableConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                
+                string topicName = "myTopic";
+                ITopic topic = session.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectSharedDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+                
+                IMessageConsumer durableConsumer = session.CreateSharedDurableConsumer(topic, subscriptionName, null); //, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
index e8c3f6f..b5688d6 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
@@ -20,7 +20,7 @@
 
 namespace NMS.AMQP.Test.TestAmqp
 {
-    class NLogAdapter : ITrace
+    public class NLogAdapter : ITrace
     {
         private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
 
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 164f141..eccf3e9 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -555,6 +555,7 @@
                         Handle = context.Command.Handle,
                         LinkName = context.Command.LinkName,
                         Target = context.Command.Target,
+                        OfferedCapabilities = new []{ SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS }
                     };
 
                     if (refuseLink)
@@ -605,7 +606,49 @@
 
             ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
         }
+        
+        public void ExpectSharedDurableSubscriberAttach(string topicName, string subscriptionName)
+        {
+            Action<string> linkNameMatcher = linkName => Assert.AreEqual(subscriptionName, linkName);
 
+            Action<object> sourceMatcher = o =>
+            {
+                Assert.IsInstanceOf<Source>(o);
+                var source = (Source) o;
+                Assert.AreEqual(topicName, source.Address);
+                Assert.IsFalse(source.Dynamic);
+                Assert.AreEqual(TerminusDurability.UNSETTLED_STATE, (TerminusDurability) source.Durable);
+                Assert.AreEqual(TerminusExpiryPolicy.NEVER, source.ExpiryPolicy);
+                CollectionAssert.Contains(source.Capabilities, SymbolUtil.ATTACH_CAPABILITIES_TOPIC);
+                CollectionAssert.Contains(source.Capabilities, SymbolUtil.SHARED);
+            };
+
+            Action<Target> targetMatcher = Assert.IsNotNull;
+
+            ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
+        }
+
+        public void ExpectSharedSubscriberAttach(string topicName, string subscriptionName)
+        {
+            Action<string> linkNameMatcher = linkName => Assert.AreEqual(subscriptionName+"|volatile1", linkName);
+
+            Action<object> sourceMatcher = o =>
+            {
+                Assert.IsInstanceOf<Source>(o);
+                var source = (Source) o;
+                Assert.AreEqual(topicName, source.Address);
+                Assert.IsFalse(source.Dynamic);
+                // Assert.AreEqual(TerminusDurability.UNSETTLED_STATE, (TerminusDurability) source.Durable);
+                // Assert.AreEqual(TerminusExpiryPolicy.NEVER, source.ExpiryPolicy);
+                CollectionAssert.Contains(source.Capabilities, SymbolUtil.ATTACH_CAPABILITIES_TOPIC);
+                CollectionAssert.Contains(source.Capabilities, SymbolUtil.SHARED);
+            };
+
+            Action<Target> targetMatcher = Assert.IsNotNull;
+
+            ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
+        }
+        
         public void ExpectDetach(bool expectClosed, bool sendResponse, bool replyClosed, Symbol errorType = null, String errorMessage = null)
         {
             var detachMatcher = new FrameMatcher<Detach>()