Merge pull request #60 from michaelandrepearce/AMQNET-637

NMS 2.0 Work
diff --git a/README.md b/README.md
index 17050b8..0a3cccd 100644
--- a/README.md
+++ b/README.md
@@ -70,17 +70,21 @@
 | IConnection | Y | The ConnectionInterruptedListener event and the ConnectionResumedListener are not supported. |
 | ProducerTransformerDelegate | N | Any member access should throw a NotSupportedException. |
 | ConsumerTransformerDelegate | N | Any member access should throw a NotSupportedException. |
-| ISession | Y | 
+| ISession | Y | |
+[ INMSContext | Y | |
 | IQueue | Y | |
 | ITopic | Y | |
 | ITemporaryQueue | Y | |
 | ITemporaryTopic | Y | |
 | IMessageProducer | Y * | Anonymous producers are only supported on connections with the ANONYMOUS-RELAY capability. |
+| INMSProducer | Y | |
 | MsgDeliveryMode.Persistent | Y | Producers will block on send until an outcome is received or will timeout after waiting the RequestTimeout timespan amount. Exceptions may be throw depending on the outcome or if the producer times out. |
 | MsgDeliveryMode.NonPersistent | Y | Producers will not block on send nor expect to receive an outcome. Should an exception be raised from the outcome the exception will be delivered using the the connection ExceptionListener. |
 | IMessageConsumer | Y | |
+| INMSConsumer | Y | |
 | Durable Consumers | Y | |
-| IQueueBrowser | N | The provider will throw NotImplementedException for the ISession create methods. |
+| Shared Consumers | Y | |
+| IQueueBrowser | Y | |
 | Configurable NMSMessageID and amqp serializtion | N | For future consideration. The prodiver will generate a MessageID from a sequence and serialize it as a string. |
 | Flow control configuration | N | For future consideration. The provider will use amqpnetlite defaults except for initial link credits which is 200. |
 | Object Deserialization Policy | N | For future consideration. The provider considers all Dotnet serialized objects in Object Message bodies are safe to deserialize. |
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index 90f8c20..9fb8ebd 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -95,7 +95,7 @@
     <ItemGroup>
         <!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
         <PackageReference Include="AMQPNetLite.Core" Version="2.4.0" />
-        <PackageReference Include="Apache.NMS" Version="1.8.0" />
+        <PackageReference Include="Apache.NMS" Version="2.0.0" />
         <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
     </ItemGroup>
 </Project>
diff --git a/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
index bfb90ca..8561a57 100644
--- a/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
@@ -35,6 +35,7 @@
         IDestination NMSReplyTo { get; set; }
         DateTime NMSTimestamp { get; set; }
         string NMSType { get; set; }
+        DateTime DeliveryTime { get; set; }
         string GroupId { get; set; }
         uint GroupSequence { get; set; }
         DateTime? Expiration { get; set; }
@@ -52,5 +53,7 @@
         object ProviderMessageIdObject { get; set; }
 
         INmsMessageFacade Copy();
+
+        bool HasBody();
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs b/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
index cea2edc..c6535aa 100644
--- a/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
@@ -19,6 +19,6 @@
 {
     public interface INmsObjectMessageFacade : INmsMessageFacade
     {
-        object Body { get; set; }        
+        object Object { get; set; }        
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsBytesMessage.cs b/src/NMS.AMQP/Message/NmsBytesMessage.cs
index 62eaa5b..199e716 100644
--- a/src/NMS.AMQP/Message/NmsBytesMessage.cs
+++ b/src/NMS.AMQP/Message/NmsBytesMessage.cs
@@ -449,5 +449,19 @@
             CopyInto(copy);
             return copy;
         }
+        
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            return !facade.HasBody() || type.IsAssignableFrom(typeof(byte[]));
+        }
+        
+        protected override T DoGetBody<T>() {
+            if (!facade.HasBody()) {
+                return default;
+            }
+
+            object o = Content;
+            return (T) o;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMapMessage.cs b/src/NMS.AMQP/Message/NmsMapMessage.cs
index 0d74176..53005f2 100644
--- a/src/NMS.AMQP/Message/NmsMapMessage.cs
+++ b/src/NMS.AMQP/Message/NmsMapMessage.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using Apache.NMS.AMQP.Message.Facade;
 using Apache.NMS.Util;
 
@@ -55,5 +56,18 @@
             CopyInto(copy);
             return copy;
         }
+        
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            return !facade.HasBody() || type.IsAssignableFrom(typeof(IPrimitiveMap));
+        }
+        
+        protected override T DoGetBody<T>() {
+            if (!facade.HasBody()) {
+                return default;
+            }
+
+            return (T) Body;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMessage.cs b/src/NMS.AMQP/Message/NmsMessage.cs
index 16871ab..1ca0f14 100644
--- a/src/NMS.AMQP/Message/NmsMessage.cs
+++ b/src/NMS.AMQP/Message/NmsMessage.cs
@@ -33,7 +33,9 @@
 
         public INmsMessageFacade Facade { get; }
 
-        public IPrimitiveMap Properties => properties ?? (properties = new MessagePropertyIntercepter(this, Facade.Properties, IsReadOnlyProperties));
+        public IPrimitiveMap Properties => properties ??
+                                           (properties = new MessagePropertyIntercepter(this, Facade.Properties,
+                                               IsReadOnlyProperties));
 
         public string NMSCorrelationID
         {
@@ -109,6 +111,12 @@
             set => Facade.NMSType = value;
         }
 
+        public DateTime NMSDeliveryTime
+        {
+            get => Facade.DeliveryTime;
+            set => Facade.DeliveryTime = value;
+        }
+
         public string NMSXGroupId
         {
             get => Facade.GroupId;
@@ -255,5 +263,25 @@
             target.IsReadOnlyProperties = IsReadOnlyProperties;
             target.NmsAcknowledgeCallback = NmsAcknowledgeCallback;
         }
+
+        public virtual bool IsBodyAssignableTo(Type type)
+        {
+            return true;
+        }
+
+        public T Body<T>()
+        {
+            if (IsBodyAssignableTo(typeof(T)))
+            {
+                return DoGetBody<T>();
+            }
+
+            throw new MessageFormatException("Message body cannot be read as type: " + typeof(T));
+        }
+
+        protected virtual T DoGetBody<T>()
+        {
+            return default;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMessageTransformation.cs b/src/NMS.AMQP/Message/NmsMessageTransformation.cs
index ef1acb1..b9c4a3c 100644
--- a/src/NMS.AMQP/Message/NmsMessageTransformation.cs
+++ b/src/NMS.AMQP/Message/NmsMessageTransformation.cs
@@ -16,6 +16,7 @@
  */
 
 using System.Collections;
+using Apache.NMS.AMQP.Util.Types.Map;
 
 namespace Apache.NMS.AMQP.Message
 {
@@ -106,7 +107,7 @@
             CopyMap(source.Properties, target.Properties);
         }
 
-        private static void CopyMap(IPrimitiveMap source, IPrimitiveMap target)
+        public static void CopyMap(IPrimitiveMap source, IPrimitiveMap target)
         {
             foreach (object key in source.Keys)
             {
@@ -151,6 +152,12 @@
                     case IDictionary dictionaryValue:
                         target.SetDictionary(name, dictionaryValue);
                         break;
+                    case object objectValue:
+                        if (target is PrimitiveMapBase primitiveMapBase)
+                        {
+                            primitiveMapBase.SetObject(name, objectValue);
+                        }
+                        break;
                 }
             }
         }
diff --git a/src/NMS.AMQP/Message/NmsObjectMessage.cs b/src/NMS.AMQP/Message/NmsObjectMessage.cs
index 556f9c2..87f0dd7 100644
--- a/src/NMS.AMQP/Message/NmsObjectMessage.cs
+++ b/src/NMS.AMQP/Message/NmsObjectMessage.cs
@@ -16,6 +16,8 @@
  */
 
 using System;
+using System.Reflection;
+using System.Runtime.CompilerServices;
 using Apache.NMS.AMQP.Message.Facade;
 
 namespace Apache.NMS.AMQP.Message
@@ -31,13 +33,13 @@
 
         public object Body
         {
-            get => this.facade.Body;
+            get => this.facade.Object;
             set
             {
                 CheckReadOnlyBody();
                 try
                 {
-                    this.facade.Body = value;
+                    this.facade.Object = value;
                 }
                 catch (Exception e)
                 {
@@ -57,5 +59,22 @@
             CopyInto(copy);
             return copy;
         }
+
+
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            if (!facade.HasBody())
+            {
+                return true;
+            }
+
+            return type.IsInstanceOfType(Body);
+        }
+        
+        protected override T DoGetBody<T>()
+        {
+            return (T) Body;
+        }
+
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsStreamMessage.cs b/src/NMS.AMQP/Message/NmsStreamMessage.cs
index 03d6150..0e085a0 100644
--- a/src/NMS.AMQP/Message/NmsStreamMessage.cs
+++ b/src/NMS.AMQP/Message/NmsStreamMessage.cs
@@ -465,5 +465,11 @@
             CopyInto(copy);
             return copy;
         }
+        
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            return false;
+        }
+
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsTextMessage.cs b/src/NMS.AMQP/Message/NmsTextMessage.cs
index 9334a36..7932207 100644
--- a/src/NMS.AMQP/Message/NmsTextMessage.cs
+++ b/src/NMS.AMQP/Message/NmsTextMessage.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using Apache.NMS.AMQP.Message.Facade;
 
 namespace Apache.NMS.AMQP.Message
@@ -49,5 +50,16 @@
             CopyInto(copy);
             return copy;
         }
+        
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            return !facade.HasBody() || type.IsAssignableFrom(typeof(string));
+        }
+        
+        protected override T DoGetBody<T>()
+        {
+            object o = Text;
+            return (T) o;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/OutboundMessageDispatch.cs b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
index a4115c8..2aefdb1 100644
--- a/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
+++ b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
@@ -25,6 +25,6 @@
         public NmsProducerId ProducerId { get; set; }
         public NmsProducerInfo ProducerInfo { get; set; }
         public NmsMessage Message { get; set; }
-        public bool SendAsync { get; set; }
+        public bool FireAndForget { get; set; }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index 24f3c13..f57c5d4 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -64,6 +64,12 @@
         public int MaxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
         public int IdleTimeOut { get; set; } = DEFAULT_IDLE_TIMEOUT;
         
+        public bool AnonymousRelaySupported { get; set; }
+        
+        public bool DelayedDeliverySupported { get; set; }
+        
+        public bool SharedSubsSupported { get; set; }
+        
 
         public void SetClientId(string clientId, bool explicitClientId)
         {
diff --git a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
index 0df66eb..70cf529 100644
--- a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
@@ -34,7 +34,9 @@
         public string Selector { get; set; }
         public bool NoLocal { get; set; }
         public string SubscriptionName { get; set; }
+        public bool IsExplicitClientId { get; set; }
         public bool IsDurable { get; set; }
+        public bool IsShared { get; set; }
         public bool LocalMessageExpiry { get; set; }
         public bool IsBrowser { get; set; }
         public int LinkCredit { get; set; } = DEFAULT_CREDIT;
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs b/src/NMS.AMQP/NmsConnectionFactory.cs
index 7f3f1c5..88101bb 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -181,6 +181,26 @@
             }

         }

 

+        public INMSContext CreateContext()

+        {

+            return new NmsContext((NmsConnection)CreateConnection(), AcknowledgementMode.AutoAcknowledge);

+        }

+

+        public INMSContext CreateContext(AcknowledgementMode acknowledgementMode)

+        {

+            return new NmsContext((NmsConnection)CreateConnection(), acknowledgementMode);

+        }

+

+        public INMSContext CreateContext(string userName, string password)

+        {

+            return new NmsContext((NmsConnection)CreateConnection(userName, password), AcknowledgementMode.AutoAcknowledge);

+        }

+

+        public INMSContext CreateContext(string userName, string password, AcknowledgementMode acknowledgementMode)

+        {

+            return new NmsContext((NmsConnection)CreateConnection(userName, password), acknowledgementMode);

+        }

+

         public Uri BrokerUri

         {

             get => brokerUri;

@@ -247,7 +267,7 @@
             }

             else

             {

-                connectionInfo.SetClientId(ClientIdGenerator.GenerateId().ToString(), false);

+                connectionInfo.SetClientId(ClientIdGenerator.GenerateId(), false);

             }

 

             return connectionInfo;

diff --git a/src/NMS.AMQP/NmsConsumer.cs b/src/NMS.AMQP/NmsConsumer.cs
new file mode 100644
index 0000000..9a2d40d
--- /dev/null
+++ b/src/NMS.AMQP/NmsConsumer.cs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsConsumer : INMSConsumer
+    {
+        
+        private readonly ISession session;
+        private readonly NmsMessageConsumer consumer;
+
+        public NmsConsumer(ISession session, NmsMessageConsumer consumer) {
+            this.session = session;
+            this.consumer = consumer;
+        }
+
+        public void Dispose()
+        {
+            consumer.Dispose();
+        }
+
+        public IMessage Receive()
+        {
+            return consumer.Receive();
+        }
+
+        public IMessage Receive(TimeSpan timeout)
+        {
+            return consumer.Receive(timeout);
+        }
+
+        public IMessage ReceiveNoWait()
+        {
+            return consumer.ReceiveNoWait();
+        }
+
+        public T ReceiveBody<T>()
+        {
+            return consumer.ReceiveBody<T>();
+        }
+
+        public T ReceiveBody<T>(TimeSpan timeout)
+        {
+            return consumer.ReceiveBody<T>(timeout);
+        }
+
+        public T ReceiveBodyNoWait<T>()
+        {
+            return consumer.ReceiveBodyNoWait<T>();
+        }
+
+        public void Close()
+        {
+            consumer.Close();
+        }
+
+        public string MessageSelector => consumer.MessageSelector;
+
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get => consumer.ConsumerTransformer; 
+            set => consumer.ConsumerTransformer = value; 
+        }
+
+        event MessageListener INMSConsumer.Listener
+        {
+            add => ((IMessageConsumer)consumer).Listener += value;
+            remove => ((IMessageConsumer)consumer).Listener -= value;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsContext.cs b/src/NMS.AMQP/NmsContext.cs
new file mode 100644
index 0000000..260647e
--- /dev/null
+++ b/src/NMS.AMQP/NmsContext.cs
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsContext : INMSContext
+    {
+        private readonly object syncRoot = new object();
+
+        private readonly NmsConnection connection;
+        private readonly AtomicLong connectionRefCount;
+        public AcknowledgementMode AcknowledgementMode { get; }
+
+        private NmsSession session;
+        private NmsMessageProducer sharedProducer;
+        private bool autoStart = true;
+
+        public NmsContext(NmsConnection connection, AcknowledgementMode acknowledgementMode)
+        {
+            this.connection = connection;
+            this.AcknowledgementMode = acknowledgementMode;
+            this.connectionRefCount = new AtomicLong(1);
+        }
+
+        private NmsContext(NmsConnection connection, AcknowledgementMode acknowledgementMode,
+            AtomicLong connectionRefCount)
+        {
+            this.connection = connection;
+            this.AcknowledgementMode = acknowledgementMode;
+            this.connectionRefCount = connectionRefCount;
+        }
+
+        public void Dispose()
+        {
+            connection.Dispose();
+        }
+
+        public void Start()
+        {
+            connection.Start();
+        }
+
+        public bool IsStarted { get => connection.IsStarted; }
+        
+        public void Stop()
+        {
+            connection.Stop();
+        }
+
+        public INMSContext CreateContext(AcknowledgementMode acknowledgementMode)
+        {
+            if (connectionRefCount.Get() == 0) {
+                throw new IllegalStateException("The Connection is closed");
+            }
+            
+            connectionRefCount.IncrementAndGet();
+            return new NmsContext(connection, acknowledgementMode, connectionRefCount);
+        }
+
+        public INMSProducer CreateProducer()
+        {
+            if (sharedProducer == null) {
+                lock (syncRoot) {
+                    if (sharedProducer == null) {
+                        sharedProducer = (NmsMessageProducer) GetSession().CreateProducer();
+                    }
+                }
+            }
+            return new NmsProducer(GetSession(), sharedProducer);
+        }
+
+
+        public INMSConsumer CreateConsumer(IDestination destination)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateConsumer(destination)));
+        }
+
+        public INMSConsumer CreateConsumer(IDestination destination, string selector)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateConsumer(destination, selector)));
+        }
+
+        public INMSConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateConsumer(destination, selector, noLocal)));
+        }
+
+        public INMSConsumer CreateDurableConsumer(ITopic destination, string subscriptionName)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateDurableConsumer(destination, subscriptionName)));
+        }
+
+        public INMSConsumer CreateDurableConsumer(ITopic destination, string subscriptionName, string selector)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateDurableConsumer(destination, subscriptionName, selector)));
+        }
+
+        public INMSConsumer CreateDurableConsumer(ITopic destination, string subscriptionName, string selector, bool noLocal)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateDurableConsumer(destination, subscriptionName, selector, noLocal)));
+        }
+
+        public INMSConsumer CreateSharedConsumer(ITopic destination, string subscriptionName)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateSharedConsumer(destination, subscriptionName)));
+        }
+
+        public INMSConsumer CreateSharedConsumer(ITopic destination, string subscriptionName, string selector)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateSharedConsumer(destination, subscriptionName, selector)));
+        }
+
+        public INMSConsumer CreateSharedDurableConsumer(ITopic destination, string subscriptionName)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateSharedDurableConsumer(destination, subscriptionName)));
+        }
+
+        public INMSConsumer CreateSharedDurableConsumer(ITopic destination, string subscriptionName, string selector)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateSharedDurableConsumer(destination, subscriptionName, selector)));
+        }
+
+        public void Unsubscribe(string name)
+        {
+            GetSession().Unsubscribe(name);
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue)
+        {
+            return GetSession().CreateBrowser(queue);
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+        {
+            return GetSession().CreateBrowser(queue, selector);
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            return GetSession().GetQueue(name);
+        }
+
+        public ITopic GetTopic(string name)
+        {
+            return GetSession().GetTopic(name);
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            return GetSession().CreateTemporaryQueue();
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            return GetSession().CreateTemporaryTopic();
+        }
+
+        public IMessage CreateMessage()
+        {
+            return GetSession().CreateMessage();
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            return GetSession().CreateTextMessage();
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            return GetSession().CreateTextMessage(text);
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return GetSession().CreateMapMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            return GetSession().CreateObjectMessage(body);
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return GetSession().CreateBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            return GetSession().CreateBytesMessage(body);
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return GetSession().CreateStreamMessage();
+        }
+
+        public void Close()
+        {
+            NMSException failure = null;
+
+            try
+            {
+                session?.Close();
+            } catch (NMSException jmse)
+            {
+                failure = jmse;
+            }
+
+            if (connectionRefCount.DecrementAndGet() == 0) {
+                try {
+                    connection.Close();
+                } catch (NMSException jmse) {
+                    if (failure == null)
+                    {
+                        failure = jmse;
+                    }
+                }
+            }
+
+            if (failure != null) {
+                throw failure;
+            }
+        }
+
+        public void Recover()
+        {
+            GetSession().Recover();
+        }
+
+        public void Acknowledge()
+        {
+            GetSession().Acknowledge();
+        }
+
+        public void Commit()
+        {
+            GetSession().Commit();
+        }
+
+        public void Rollback()
+        {
+            GetSession().Rollback();
+        }
+
+        public void PurgeTempDestinations()
+        {
+            connection.PurgeTempDestinations();
+        }
+        
+        
+        private NmsSession GetSession() {
+            if (session == null) {
+                lock (syncRoot) {
+                    if (session == null)
+                    {
+                        session = (NmsSession) connection.CreateSession(AcknowledgementMode);
+                    }
+                }
+            }
+            return session;
+        }
+        
+        private NmsConsumer StartIfNeeded(NmsConsumer consumer) {
+            if (autoStart) {
+                connection.Start();
+            }
+            return consumer;
+        }
+        
+
+        public ConsumerTransformerDelegate ConsumerTransformer { get => session.ConsumerTransformer; set => session.ConsumerTransformer = value; }
+        
+        public ProducerTransformerDelegate ProducerTransformer { get => session.ProducerTransformer; set => session.ProducerTransformer = value; }
+        
+        public TimeSpan RequestTimeout { get => session.RequestTimeout; set => session.RequestTimeout = value; }
+        
+        public bool Transacted => session.Transacted;
+        
+        public string ClientId { get => connection.ClientId; set => connection.ClientId = value; }
+        
+        public bool AutoStart { get => autoStart; set => autoStart = value; }
+        
+        public event SessionTxEventDelegate TransactionStartedListener;
+        
+        public event SessionTxEventDelegate TransactionCommittedListener;
+        
+        public event SessionTxEventDelegate TransactionRolledBackListener;
+        
+        public event ExceptionListener ExceptionListener;
+        
+        public event ConnectionInterruptedListener ConnectionInterruptedListener;
+        
+        public event ConnectionResumedListener ConnectionResumedListener;
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs b/src/NMS.AMQP/NmsDurableMessageConsumer.cs
similarity index 82%
rename from src/NMS.AMQP/NmsDurableTopicSubscriber.cs
rename to src/NMS.AMQP/NmsDurableMessageConsumer.cs
index bc446bb..60ef3c8 100644
--- a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
+++ b/src/NMS.AMQP/NmsDurableMessageConsumer.cs
@@ -19,16 +19,19 @@
 
 namespace Apache.NMS.AMQP
 {
-    public class NmsDurableTopicSubscriber : NmsMessageConsumer
+    public class NmsDurableMessageConsumer : NmsMessageConsumer
     {
-        public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+        public NmsDurableMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
         {
         }
 
-        public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+        public NmsDurableMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
         {
         }
 
         protected override bool IsDurableSubscription => true;
+        
+        protected override bool IsSharedSubscription => false;
+
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index 121a93c..36abe8f 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -55,9 +55,13 @@
                 Destination = destination,
                 Selector = selector,
                 NoLocal = noLocal,
+                IsExplicitClientId = Session.Connection.ConnectionInfo.IsExplicitClientId,
                 SubscriptionName = name,
-                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry,
-                IsDurable = IsDurableSubscription
+                IsShared = IsSharedSubscription,
+                IsDurable = IsDurableSubscription,
+                IsBrowser =  IsBrowser,
+                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry
+
             };
             deliveryTask = new MessageDeliveryTask(this);
             
@@ -74,6 +78,10 @@
         public IDestination Destination => Info.Destination;
 
         protected virtual bool IsDurableSubscription => false;
+        
+        protected virtual bool IsSharedSubscription => false;
+        
+        protected virtual bool IsBrowser => false;
 
         public void Dispose()
         {
@@ -101,6 +109,8 @@
 
         public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
 
+        public string MessageSelector => Info.Selector;
+
         event MessageListener IMessageConsumer.Listener
         {
             add
@@ -134,6 +144,21 @@
                 }
             }
         }
+        
+        public T ReceiveBody<T>()
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            while (true)
+            {
+                if (started)
+                {
+                    return ReceiveBodyInternal<T>(-1);
+                }
+            }
+        }
+
 
         public IMessage ReceiveNoWait()
         {
@@ -142,6 +167,14 @@
 
             return started ? ReceiveInternal(0) : null;
         }
+        
+        public T ReceiveBodyNoWait<T>()
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            return started ? ReceiveBodyInternal<T>(0) : default;
+        }
 
         public IMessage Receive(TimeSpan timeout)
         {
@@ -171,6 +204,35 @@
                 }
             }
         }
+        
+        public T ReceiveBody<T>(TimeSpan timeout)
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            int timeoutInMilliseconds = (int) timeout.TotalMilliseconds;
+
+            if (started)
+            {
+                return ReceiveBodyInternal<T>(timeoutInMilliseconds);
+            }
+
+            long deadline = GetDeadline(timeoutInMilliseconds);
+
+            while (true)
+            {
+                timeoutInMilliseconds = (int) (deadline - DateTime.UtcNow.Ticks / 10_000L);
+                if (timeoutInMilliseconds < 0)
+                {
+                    return default;
+                }
+
+                if (started)
+                {
+                    return ReceiveBodyInternal<T>(timeoutInMilliseconds);
+                }
+            }
+        }
 
         private void CheckMessageListener()
         {
@@ -331,6 +393,42 @@
 
         private IMessage ReceiveInternal(int timeout)
         {
+            return ReceiveInternal(timeout, envelope =>
+            {
+                IMessage message = envelope.Message.Copy();
+                AckFromReceive(envelope);
+                return message;
+            });
+        }
+        
+        private T ReceiveBodyInternal<T>(int timeout)
+        {
+            return ReceiveInternal<T>(timeout, envelope =>
+            {
+                try
+                {
+                    T body = envelope.Message.Body<T>();
+                    AckFromReceive(envelope);
+                    return body;
+                }
+                catch (MessageFormatException mfe)
+                {
+                    // Should behave as if receiveBody never happened in these modes.
+                    if (acknowledgementMode == AcknowledgementMode.AutoAcknowledge ||
+                        acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge) {
+
+                        envelope.EnqueueFirst = true;
+                        OnInboundMessage(envelope);
+                    }
+
+                    throw mfe;
+                }
+            });
+        }
+
+
+        private T ReceiveInternal<T>(int timeout, Func<InboundMessageDispatch, T> func)
+        {
             try
             {
                 long deadline = 0;
@@ -352,7 +450,7 @@
                         throw NMSExceptionSupport.Create(failureCause);
 
                     if (envelope == null)
-                        return null;
+                        return default;
 
                     if (IsMessageExpired(envelope))
                     {
@@ -383,8 +481,7 @@
                             Tracer.Debug($"{Info.Id} received message {envelope.Message.NMSMessageId}.");
                         }
 
-                        AckFromReceive(envelope);
-                        return envelope.Message.Copy();
+                        return func.Invoke(envelope);
                     }
                 }
             }
@@ -397,6 +494,7 @@
                 throw ExceptionSupport.Wrap(ex, "Receive failed");
             }
         }
+        
 
         private static long GetDeadline(int timeout)
         {
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index e1b40ba..3f7e3db 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -31,6 +31,7 @@
         private readonly AtomicLong messageSequence = new AtomicLong();
 
         private Exception failureCause;
+        private TimeSpan deliveryDelay = TimeSpan.Zero;
         private MsgDeliveryMode deliveryMode = MsgDeliveryMode.Persistent;
         private TimeSpan timeToLive = NMSConstants.defaultTimeToLive;
         private TimeSpan requestTimeout;
@@ -85,7 +86,29 @@
         public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
         {
             CheckClosed();
-            session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp);
+            session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay);
+        }
+
+        public Task SendAsync(IMessage message)
+        {
+            return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive);
+        }
+
+        public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive);
+        }
+
+        public Task SendAsync(IDestination destination, IMessage message)
+        {
+            return SendAsync(destination, message, deliveryMode, priority, timeToLive);
+        }
+
+        public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
+            TimeSpan timeToLive)
+        {
+            CheckClosed();
+            return session.SendAsync(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay);
         }
 
         public void Close()
@@ -231,6 +254,25 @@
             }
         }
 
+        public TimeSpan DeliveryDelay
+        {
+            get
+            {
+                CheckClosed();
+                return deliveryDelay;
+            }
+            set
+            {
+                if (!session.Connection.ConnectionInfo.DelayedDeliverySupported)
+                {
+                    throw new NotSupportedException("Delayed Delivery is not supported");
+                }
+                
+                CheckClosed();
+                deliveryDelay = value;
+            }
+        }
+        
         public Task OnConnectionRecovery(IProvider provider)
         {
             return provider.CreateResource(Info);
diff --git a/src/NMS.AMQP/NmsProducer.cs b/src/NMS.AMQP/NmsProducer.cs
new file mode 100644
index 0000000..974a185
--- /dev/null
+++ b/src/NMS.AMQP/NmsProducer.cs
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Threading.Tasks;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsProducer : INMSProducer
+    {
+        
+        private readonly ISession session;
+        private readonly NmsMessageProducer producer;
+        
+        // Message Headers
+        private String correlationId;
+        private String type;
+        private IDestination replyTo;
+
+        // Message Properties
+        private readonly IPrimitiveMap messageProperties = new PrimitiveMap();
+
+        /**
+         * Create a new JMSProducer instance.
+         *
+         * The producer is backed by the given Session object and uses the shared MessageProducer
+         * instance to send all of its messages.
+         *
+         * @param session
+         *      The Session that created this JMSProducer
+         * @param producer
+         *      The shared MessageProducer owned by the parent Session.
+         */
+        public NmsProducer(ISession session, NmsMessageProducer producer) {
+            this.session = session;
+            this.producer = producer;
+        }
+
+        
+        public void Dispose()
+        {
+            producer.Dispose();
+        }
+
+        public INMSProducer Send(IDestination destination, IMessage message)  {
+
+            if (message == null) {
+                throw new MessageFormatException("Message must not be null");
+            }
+            
+            NmsMessageTransformation.CopyMap(messageProperties, message.Properties);
+            
+            if (correlationId != null) {
+                message.NMSCorrelationID = correlationId;
+            }
+            if (type != null) {
+                message.NMSType = type;
+            }
+            if (replyTo != null) {
+                message.NMSReplyTo = replyTo;
+            }
+            
+            producer.Send(destination, message);
+            
+            return this;
+        }
+
+        public INMSProducer Send(IDestination destination, string body)
+        {
+            return Send(destination, CreateTextMessage(body));
+        }
+
+        public INMSProducer Send(IDestination destination, IPrimitiveMap body)
+        {
+            IMapMessage message = CreateMapMessage();
+            NmsMessageTransformation.CopyMap(body, message.Body);
+            return Send(destination, message);
+        }
+
+        public INMSProducer Send(IDestination destination, byte[] body)
+        {
+            return Send(destination, CreateBytesMessage(body));
+        }
+
+        public INMSProducer Send(IDestination destination, object body)
+        {
+            return Send(destination, CreateObjectMessage(body));
+        }
+
+        public async Task<INMSProducer> SendAsync(IDestination destination, IMessage message)
+        {
+            if (message == null) {
+                throw new MessageFormatException("Message must not be null");
+            }
+
+            NmsMessageTransformation.CopyMap(messageProperties, message.Properties);
+            
+            if (correlationId != null) {
+                message.NMSCorrelationID = correlationId;
+            }
+            if (type != null) {
+                message.NMSType = type;
+            }
+            if (replyTo != null) {
+                message.NMSReplyTo = replyTo;
+            }
+
+            await producer.SendAsync(destination, message);
+            return this;
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, string body)
+        {
+            return SendAsync(destination, CreateTextMessage(body));
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, IPrimitiveMap body)
+        {
+            IMapMessage message = CreateMapMessage();
+            NmsMessageTransformation.CopyMap(body, message.Body);
+            return SendAsync(destination, message);
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, byte[] body)
+        {
+            return SendAsync(destination, CreateBytesMessage(body));
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, object body)
+        {
+            return SendAsync(destination, CreateObjectMessage(body));
+        }
+
+        public INMSProducer ClearProperties()
+        {
+            messageProperties.Clear();
+            return this;
+        }
+
+
+        public IMessage CreateMessage()
+        {
+            return session.CreateMessage();
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            return session.CreateTextMessage();
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            return session.CreateTextMessage(text);
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return session.CreateMapMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            return session.CreateObjectMessage(body);
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return session.CreateBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            return session.CreateBytesMessage(body);
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return session.CreateStreamMessage();
+        }
+
+        public void Close()
+        {
+            producer.Close();
+        }
+
+
+        public string NMSCorrelationID
+        {
+            get => correlationId;
+            set => correlationId = value;
+        }
+        
+        public INMSProducer SetNMSCorrelationID(string correlationID)
+        {
+            NMSCorrelationID = correlationID;
+            return this;
+        }
+
+
+        public IDestination NMSReplyTo
+        {
+            get => replyTo;
+            set => replyTo = value;
+        }
+
+        public INMSProducer SetNMSReplyTo(IDestination replyTo)
+        {
+            NMSReplyTo = replyTo;
+            return this;
+        }
+        
+        public string NMSType
+        {
+            get => type;
+            set => type = value;
+        }
+
+        public INMSProducer SetNMSType(string type)
+        {
+            NMSType = type;
+            return this;
+        }
+
+        public MsgDeliveryMode DeliveryMode
+        {
+            get => producer.DeliveryMode;
+            set => producer.DeliveryMode = value;
+        }
+        
+        public INMSProducer SetDeliveryMode(MsgDeliveryMode deliveryMode)
+        {
+            DeliveryMode = deliveryMode;
+            return this;
+        }
+
+        public TimeSpan TimeToLive
+        {
+            get => producer.TimeToLive;
+            set => producer.TimeToLive = value;
+        }
+
+        public INMSProducer SetTimeToLive(TimeSpan timeToLive)
+        {
+            TimeToLive = timeToLive;
+            return this;
+        }
+
+        public TimeSpan RequestTimeout
+        {
+            get => producer.RequestTimeout;
+            set => producer.RequestTimeout = value;
+        }
+
+        public MsgPriority Priority
+        {
+            get => producer.Priority;
+            set => producer.Priority = value;
+        }
+        
+        public INMSProducer SetPriority(MsgPriority priority)
+        {
+            Priority = priority;
+            return this;
+        }
+
+        public bool DisableMessageID
+        {
+            get => producer.DisableMessageID;
+            set => producer.DisableMessageID = value;
+        }
+        
+        public INMSProducer SetDisableMessageID(bool value)
+        {
+            DisableMessageID = value;
+            return this;
+        }
+
+        public bool DisableMessageTimestamp
+        {
+            get => producer.DisableMessageTimestamp;
+            set => producer.DisableMessageTimestamp = value;
+        }
+
+        public INMSProducer SetDisableMessageTimestamp(bool value)
+        {
+            DisableMessageTimestamp = value;
+            return this;
+        }
+
+        public TimeSpan DeliveryDelay
+        {
+            get => producer.DeliveryDelay;
+            set => producer.DeliveryDelay = value;
+        }
+        
+        public INMSProducer SetDeliveryDelay(TimeSpan deliveryDelay)
+        {
+            DeliveryDelay = deliveryDelay;
+            return this;
+        }
+        
+        public IPrimitiveMap Properties => messageProperties;
+
+        public INMSProducer SetProperty(string name, bool value)
+        {
+            messageProperties.SetBool(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, byte value)
+        {
+            messageProperties.SetByte(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, double value)
+        {
+            messageProperties.SetDouble(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, float value)
+        {
+            messageProperties.SetFloat(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, int value)
+        {
+            messageProperties.SetInt(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, long value)
+        {
+            messageProperties.SetLong(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, short value)
+        {
+            messageProperties.SetShort(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, char value)
+        {
+            messageProperties.SetChar(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, string value)
+        {
+            messageProperties.SetString(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, byte[] value)
+        {
+            messageProperties.SetBytes(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, IList value)
+        {
+            messageProperties.SetList(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, IDictionary value)
+        {
+            messageProperties.SetDictionary(name, value);
+            return this;
+        }
+
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get => producer.ProducerTransformer; 
+            set => producer.ProducerTransformer = value;
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsQueueBrowser.cs b/src/NMS.AMQP/NmsQueueBrowser.cs
new file mode 100644
index 0000000..e1eb348
--- /dev/null
+++ b/src/NMS.AMQP/NmsQueueBrowser.cs
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsQueueBrowser : IQueueBrowser, IEnumerator
+    {
+        private readonly object syncRoot = new object();
+
+        private readonly NmsSession session;
+        private readonly IQueue destination;
+        private readonly string selector;
+
+        private volatile NmsMessageConsumer consumer;
+
+        private IMessage current;
+        private readonly AtomicBool closed = new AtomicBool();
+        
+        public NmsQueueBrowser(NmsSession session, IQueue destination, string selector)
+        {
+            this.session = session;
+            this.destination = destination;
+            this.selector = selector;
+        }
+
+        public IEnumerator GetEnumerator()
+        {
+            CheckClosed();
+            CreateConsumer();
+
+            return this;
+        }
+
+        public bool MoveNext()
+        {
+            current = Next();
+
+            if (!session.IsStarted) {
+                DestroyConsumer();
+                return false;
+            }
+            
+            return current != null;
+        }
+        
+        private IMessage Next() {
+            while (true) {
+                IMessageConsumer consumer = this.consumer;
+                if (consumer == null) {
+                    return null;
+                }
+
+                IMessage next = null;
+
+                try {
+                    next = consumer.ReceiveNoWait();
+                } catch (NMSException e) {
+                    Tracer.WarnFormat("Error while receive the next message: {}", e.Message);
+                }
+
+                if (next == null) {
+                    DestroyConsumer();
+                }
+
+                return next;
+            }
+        }
+
+        public void Reset()
+        {
+            CheckClosed();
+            DestroyConsumer();
+            CreateConsumer();
+        }
+
+        public object Current
+        {
+            get => current;
+        }
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public void Close()
+        {
+            if (closed.CompareAndSet(false, true)) {
+                DestroyConsumer();
+            }
+        }
+
+        public string MessageSelector => selector;
+        public IQueue Queue => destination;
+
+        private void CheckClosed()
+        {
+            if (closed)
+            {
+                throw new IllegalStateException("The MessageConsumer is closed");
+            }
+        }
+
+        private void CreateConsumer()
+        {
+            lock (syncRoot)
+            {
+                if (consumer == null)
+                {
+                    NmsMessageConsumer messageConsumer = new NmsQueueBrowserMessageConsumer(session.GetNextConsumerId(), session,
+                        destination, selector, false);
+
+                    messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
+
+                    // Assign only after fully created and initialized.
+                    consumer = messageConsumer;
+                }
+            }
+        }
+        
+        private void DestroyConsumer()
+        {
+            lock (syncRoot)
+            {
+                try
+                {
+                    consumer?.Close();
+                }
+                catch (NMSException e)
+                {
+                    Tracer.DebugFormat("Error closing down internal consumer: ", e);
+                }
+                finally
+                {
+                    consumer = null;
+                }
+            }
+        }
+
+        public class NmsQueueBrowserMessageConsumer : NmsMessageConsumer
+        {
+            public NmsQueueBrowserMessageConsumer(NmsConsumerId consumerId, NmsSession session,
+                IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination,
+                selector, noLocal)
+            {
+            }
+
+            protected override bool IsBrowser => true;
+
+        }
+    }
+}
+
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 425cf51..e1bb898 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -144,23 +144,68 @@
             return messageConsumer;
         }
 
-        private NmsConsumerId GetNextConsumerId()
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name)
         {
-            return new NmsConsumerId(SessionInfo.Id, consumerIdGenerator.IncrementAndGet());
+            return CreateDurableConsumer(destination, name, null, false);
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector)
+        {
+            return CreateDurableConsumer(destination, name, selector, false);
         }
 
         public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
         {
             CheckClosed();
 
-            NmsMessageConsumer messageConsumer = new NmsDurableTopicSubscriber(GetNextConsumerId(), this, destination, name, selector, noLocal);
+            NmsMessageConsumer messageConsumer = new NmsDurableMessageConsumer(GetNextConsumerId(), this, destination, name, selector, noLocal);
             messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
 
             return messageConsumer;
         }
 
+        public IMessageConsumer CreateSharedConsumer(ITopic destination, string name)
+        {
+            return CreateSharedConsumer(destination, name, null);
+        }
+
+        public IMessageConsumer CreateSharedConsumer(ITopic destination, string name, string selector)
+        {
+            CheckClosed();
+
+            NmsMessageConsumer messageConsumer = new NmsSharedMessageConsumer(GetNextConsumerId(), this, destination, name, selector, false);
+            messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
+            
+            return messageConsumer;
+        }
+
+        public IMessageConsumer CreateSharedDurableConsumer(ITopic destination, string name)
+        {
+            return CreateSharedDurableConsumer(destination, name, null);
+        }
+
+        public IMessageConsumer CreateSharedDurableConsumer(ITopic destination, string name, string selector)
+        {
+            CheckClosed();
+
+            NmsMessageConsumer messageConsumer = new NmsSharedDurableMessageConsumer(GetNextConsumerId(), this, destination, name, selector, false);
+            messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
+            
+            return messageConsumer;
+        }
+
+        public NmsConsumerId GetNextConsumerId()
+        {
+            return new NmsConsumerId(SessionInfo.Id, consumerIdGenerator.IncrementAndGet());
+        }
+
         public void DeleteDurableConsumer(string name)
         {
+            Unsubscribe(name);
+        }
+
+        public void Unsubscribe(string name)
+        {
             CheckClosed();
 
             Connection.Unsubscribe(name);
@@ -168,12 +213,14 @@
 
         public IQueueBrowser CreateBrowser(IQueue queue)
         {
-            throw new NotImplementedException();
+            return CreateBrowser(queue, null);
         }
 
         public IQueueBrowser CreateBrowser(IQueue queue, string selector)
         {
-            throw new NotImplementedException();
+            CheckClosed();
+
+            return new NmsQueueBrowser(this, queue, selector);
         }
 
         public IQueue GetQueue(string name)
@@ -288,6 +335,13 @@
                 Start();
         }
 
+        public void Acknowledge()
+        {
+            if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) {
+                Acknowledge(AckType.ACCEPTED);
+            }
+        }
+
         public void Commit()
         {
             CheckClosed();
@@ -396,8 +450,18 @@
             Connection.Acknowledge(envelope, ackType).ConfigureAwait(false).GetAwaiter().GetResult();
         }
 
-        public void Send(NmsMessageProducer producer, IDestination destination, IMessage original, MsgDeliveryMode deliveryMode,
-            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp)
+        public void Send(NmsMessageProducer producer, IDestination destination, IMessage original,
+            MsgDeliveryMode deliveryMode,
+            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay)
+        {
+
+            SendAsync(producer, destination, original, deliveryMode, priority, timeToLive, disableMessageId,
+                disableMessageTimestamp, deliveryDelay).ConfigureAwait(false).GetAwaiter().GetResult();
+            
+        }
+
+        public Task SendAsync(NmsMessageProducer producer, IDestination destination, IMessage original, MsgDeliveryMode deliveryMode,
+            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay)
         {
             if (destination == null)
                 throw new InvalidDestinationException("Destination must not be null");
@@ -416,6 +480,7 @@
             DateTime timeStamp = DateTime.UtcNow;
 
             bool hasTTL = timeToLive > TimeSpan.Zero;
+            bool hasDelay = deliveryDelay > TimeSpan.Zero;
 
             if (!disableMessageTimestamp)
             {
@@ -446,6 +511,11 @@
                 original.NMSMessageId = outbound.NMSMessageId;
             }
 
+            if (hasDelay)
+            {
+                outbound.Facade.DeliveryTime = timeStamp + deliveryDelay;
+            }
+
             if (hasTTL)
                 outbound.Facade.Expiration = timeStamp + timeToLive;
             else
@@ -453,15 +523,15 @@
 
             outbound.OnSend(timeToLive);
 
-            bool sync = deliveryMode == MsgDeliveryMode.Persistent;
+            bool fireAndForget = deliveryMode == MsgDeliveryMode.NonPersistent;
 
-            TransactionContext.Send(new OutboundMessageDispatch
+            return TransactionContext.Send(new OutboundMessageDispatch
             {
                 Message = outbound,
                 ProducerId = producer.Info.Id,
                 ProducerInfo = producer.Info,
-                SendAsync = !sync
-            }).ConfigureAwait(false).GetAwaiter().GetResult();
+                FireAndForget = fireAndForget
+            });
         }
 
         internal void EnqueueForDispatch(NmsMessageConsumer.MessageDeliveryTask task)
diff --git a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs b/src/NMS.AMQP/NmsSharedDurableMessageConsumer.cs
similarity index 62%
copy from src/NMS.AMQP/NmsDurableTopicSubscriber.cs
copy to src/NMS.AMQP/NmsSharedDurableMessageConsumer.cs
index bc446bb..4a838bd 100644
--- a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
+++ b/src/NMS.AMQP/NmsSharedDurableMessageConsumer.cs
@@ -19,16 +19,19 @@
 
 namespace Apache.NMS.AMQP
 {
-    public class NmsDurableTopicSubscriber : NmsMessageConsumer
+    public class NmsSharedDurableMessageConsumer : NmsMessageConsumer
     {
-        public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+        public NmsSharedDurableMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
         {
         }
 
-        public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+        public NmsSharedDurableMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
         {
         }
 
         protected override bool IsDurableSubscription => true;
+        
+        protected override bool IsSharedSubscription => true;
+
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsSharedMessageConsumer.cs b/src/NMS.AMQP/NmsSharedMessageConsumer.cs
new file mode 100644
index 0000000..392a6a7
--- /dev/null
+++ b/src/NMS.AMQP/NmsSharedMessageConsumer.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.AMQP.Meta;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsSharedMessageConsumer : NmsMessageConsumer
+    {
+        public NmsSharedMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+        {
+        }
+
+        public NmsSharedMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+        {
+        }
+
+        protected override bool IsDurableSubscription => false;
+        
+        protected override bool IsSharedSubscription => true;
+
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 4692551..6b40430 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -21,6 +21,7 @@
 using System.Threading.Tasks;
 using Amqp;
 using Amqp.Framing;
+using Amqp.Types;
 using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Meta;
 using Apache.NMS.AMQP.Provider.Amqp.Message;
@@ -63,6 +64,8 @@
         public string TopicPrefix => Info.TopicPrefix;
         public bool ObjectMessageUsesAmqpTypes { get; set; } = false;
         public NmsConnectionInfo Info { get; }
+        
+        public AmqpSubscriptionTracker SubscriptionTracker { get; } = new AmqpSubscriptionTracker();
 
         public INmsMessageFactory MessageFactory => messageFactory;
 
@@ -114,7 +117,8 @@
             {
                 SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
                 SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
-                SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY
+                SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY,
+                SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS
             };
         }
 
@@ -127,6 +131,30 @@
             }
             else
             {
+                Symbol[] capabilities = open.OfferedCapabilities;
+                if (capabilities != null)
+                {
+                    if (Array.Exists(capabilities,
+                        symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY)))
+                    {
+                        Info.AnonymousRelaySupported = true;
+                    }
+
+                    if (Array.Exists(capabilities,
+                        symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY)))
+                    {
+                        Info.DelayedDeliverySupported = true;
+                    }
+
+                    if (Array.Exists(capabilities,
+                        symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS)))
+                    {
+                        Info.SharedSubsSupported = true;
+                    }
+                }
+
+
+
                 object value = SymbolUtil.GetFromFields(open.Properties, SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
                 if (value is string topicPrefix)
                 {
@@ -138,7 +166,6 @@
                 {
                     Info.QueuePrefix = queuePrefix;
                 }
-
                 this.tsc.TrySetResult(true);
                 Provider.FireConnectionEstablished();
             }
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 7ecf7e7..d4ceeac 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -42,6 +42,9 @@
         private ReceiverLink receiverLink;
         private readonly LinkedList<InboundMessageDispatch> messages;
         private readonly object syncRoot = new object();
+        
+        private bool validateSharedSubsLinkCapability;
+        private bool sharedSubsNotSupported;
 
         private readonly AmqpSession session;
         public IDestination Destination => info.Destination;
@@ -56,6 +59,7 @@
         }
 
         public NmsConsumerId ConsumerId => this.info.Id;
+        
 
         public Task Attach()
         {
@@ -69,38 +73,95 @@
                 RcvSettleMode = ReceiverSettleMode.First,
                 SndSettleMode = (info.IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
             };
-            string name;
-            if (info.IsDurable)
+
+            string receiverLinkName = null;
+
+            string subscriptionName = info.SubscriptionName;
+            if (!string.IsNullOrEmpty(subscriptionName))
             {
-                name = info.SubscriptionName;
+                AmqpConnection connection = session.Connection;
+
+                if (info.IsShared && !connection.Info.SharedSubsSupported) {
+                    validateSharedSubsLinkCapability = true;
+                }
+
+                AmqpSubscriptionTracker subTracker = connection.SubscriptionTracker;
+
+                // Validate subscriber type allowed given existing active subscriber types.
+                if (info.IsShared && info.IsDurable) {
+                    if(subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
+                        // Don't allow shared sub if there is already an active exclusive durable sub
+                        throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+                    }
+                } else if (!info.IsShared && info.IsDurable) {
+                    if (subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
+                        // Exclusive durable sub is already active
+                        throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+                    } else if (subTracker.IsActiveSharedDurableSub(subscriptionName)) {
+                        // Don't allow exclusive durable sub if there is already an active shared durable sub
+                        throw new NMSException("A shared durable subscription is already active with name '" + subscriptionName + "'");
+                    }
+                }
+
+                // Get the link name for the subscription. Throws if certain further validations fail.
+                receiverLinkName = subTracker.ReserveNextSubscriptionLinkName(subscriptionName, info);
             }
-            else
-            {
+
+            
+            if (receiverLinkName == null) {
                 string destinationAddress = source.Address ?? "";
-                name = "nms:receiver:" + info.Id
-                                       + (destinationAddress.Length == 0 ? "" : (":" + destinationAddress));
+                receiverLinkName = "nms:receiver:" + info.Id
+                                                   + (destinationAddress.Length == 0 ? "" : (":" + destinationAddress));
             }
 
             // TODO: Add timeout
             var tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
-            receiverLink = new ReceiverLink(session.UnderlyingSession, name, attach, HandleOpened(tsc));
+            receiverLink = new ReceiverLink(session.UnderlyingSession, receiverLinkName, attach, HandleOpened(tsc));
             receiverLink.AddClosedCallback(HandleClosed(tsc));
             return tsc.Task;
         }
 
         private OnAttached HandleOpened(TaskCompletionSource<bool> tsc) => (link, attach) =>
         {
+            if (validateSharedSubsLinkCapability)
+            {
+                Symbol[] remoteOfferedCapabilities = attach.OfferedCapabilities;
+
+                bool supported = false;
+                if (remoteOfferedCapabilities != null)
+                {
+                    if (Array.Exists(remoteOfferedCapabilities, symbol => SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS.Equals(symbol)))
+                    {
+                        supported = true;
+                    }
+                }
+
+                if (!supported)
+                {
+                    sharedSubsNotSupported = true;
+
+                    if (info.IsDurable)
+                    {
+                        link.Detach(null);
+                    }
+                    else
+                    {
+                        link.Close();
+                    }
+                }
+            }
+
             if (IsClosePending(attach))
                 return;
 
             tsc.SetResult(true);
         };
 
-        private static bool IsClosePending(Attach attach)
+        private bool IsClosePending(Attach attach)
         {
             // When no link terminus was created, the peer will now detach/close us otherwise
             // we need to validate the returned remote source prior to open completion.
-            return attach.Source == null;
+            return sharedSubsNotSupported || attach.Source == null;
         }
 
         private ClosedCallback HandleClosed(TaskCompletionSource<bool> tsc) => (sender, error) =>
@@ -108,6 +169,7 @@
             NMSException exception = ExceptionSupport.GetException(error, "Received Amqp link detach with Error for link {0}", info.Id);
             if (!tsc.TrySetException(exception))
             {
+                session.Connection.SubscriptionTracker.ConsumerRemoved(info);
                 session.RemoveConsumer(info.Id);
 
                 // If session is not closed it means that the link was remotely detached 
@@ -142,13 +204,33 @@
                 source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
                 source.Durable = (int) TerminusDurability.NONE;
             }
+            
+            
 
             if (info.IsBrowser)
             {
                 source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
             }
 
-            source.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(info.Destination) };
+            
+            IList<Symbol> capabilities = new List<Symbol>();
+            Symbol typeCapability = SymbolUtil.GetTerminusCapabilitiesForDestination(info.Destination);
+            if (typeCapability != null)
+            {
+                capabilities.Add(typeCapability);
+            }
+            
+            if (info.IsShared) {
+                capabilities.Add(SymbolUtil.SHARED);
+
+                if(!info.IsExplicitClientId) {
+                    capabilities.Add(SymbolUtil.GLOBAL);
+                }
+            }
+
+            if (capabilities.Any()) {
+                source.Capabilities = capabilities.ToArray();
+            }
 
             Map filters = new Map();
             
@@ -344,7 +426,7 @@
 
         public bool HasSubscription(string subscriptionName)
         {
-            return info.IsDurable && info.SubscriptionName.Equals(subscriptionName);
+            return (info.IsDurable || info.IsShared) && info.SubscriptionName.Equals(subscriptionName);
         }
 
         public void PostRollback()
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index 4ccbdae..d52a278 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -29,6 +29,8 @@
 {
     public class AmqpProducer
     {
+        private static readonly OutcomeCallback _onOutcome = OnOutcome;
+        
         private readonly AmqpSession session;
         private readonly NmsProducerInfo info;
         private SenderLink senderLink;
@@ -117,7 +119,7 @@
             return target;
         }
 
-        public void Send(OutboundMessageDispatch envelope)
+        public async Task Send(OutboundMessageDispatch envelope)
         {
             if (envelope.Message.Facade is AmqpNmsMessageFacade facade)
             {
@@ -134,14 +136,12 @@
 
                     var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
 
-                    if (envelope.SendAsync)
-                        SendAsync(message, transactionalState);
-                    else
+                    if (envelope.FireAndForget)
+                    {
                         SendSync(message, transactionalState);
-                }
-                catch (TimeoutException tex)
-                {
-                    throw ExceptionSupport.GetTimeoutException(this.senderLink, tex.Message);
+                        return;
+                    }
+                    await SendAsync(message, transactionalState).ConfigureAwait(false);
                 }
                 catch (AmqpException amqpEx)
                 {
@@ -157,39 +157,56 @@
             }
         }
 
-        private void SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
+        private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
         {
             senderLink.Send(message, deliveryState, null, null);
         }
-
-        private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
+        
+        private async Task SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
         {
-            ManualResetEvent manualResetEvent = new ManualResetEvent(false);
-            Outcome outcome = null;
-
-            senderLink.Send(message, deliveryState, Callback, manualResetEvent);
-            if (!manualResetEvent.WaitOne((int) session.Connection.Provider.SendTimeout))
+            var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
+            CancellationTokenSource cts = null;
+            if (session.Connection.Provider.SendTimeout != NmsConnectionInfo.INFINITE)
             {
-                throw new TimeoutException(Fx.Format(SRAmqp.AmqpTimeout, "send", session.Connection.Provider.SendTimeout, nameof(message)));
+                cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(session.Connection.Provider.SendTimeout));
+                cts.Token.Register(_ =>
+                {
+                    var timeoutException = ExceptionSupport.GetTimeoutException(this.senderLink, $"The operation did not complete within the allocated time {session.Connection.Provider.SendTimeout}ms.");
+                    tcs.TrySetException(timeoutException);
+                }, null);
             }
-            if (outcome == null)
-                return;
-            
-            if (outcome.Descriptor.Name.Equals(MessageSupport.RELEASED_INSTANCE.Descriptor.Name))
+            try
             {
-                Error error = new Error(ErrorCode.MessageReleased);
-                throw ExceptionSupport.GetException(error, $"Message {message.Properties.GetMessageId()} released");
+                senderLink.Send(message, deliveryState, _onOutcome, tcs);
+                await tcs.Task.ConfigureAwait(false);
             }
-            if (outcome.Descriptor.Name.Equals(MessageSupport.REJECTED_INSTANCE.Descriptor.Name))
+            finally
+            {
+                cts?.Dispose();
+            }
+        }
+        
+        private static void OnOutcome(ILink sender, global::Amqp.Message message, Outcome outcome, object state)
+        {
+            var tcs = (TaskCompletionSource<bool>) state;
+            if (outcome.Descriptor.Code == MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code)
+            {
+                tcs.TrySetResult(true);
+            }
+            else if (outcome.Descriptor.Code == MessageSupport.REJECTED_INSTANCE.Descriptor.Code)
             {
                 Rejected rejected = (Rejected) outcome;
-                throw ExceptionSupport.GetException(rejected.Error, $"Message {message.Properties.GetMessageId()} rejected");
+                tcs.TrySetException(ExceptionSupport.GetException(rejected.Error, $"Message {message.Properties.GetMessageId()} rejected"));
             }
-            
-            void Callback(ILink l, global::Amqp.Message m, Outcome o, object s)
+            else if (outcome.Descriptor.Code == MessageSupport.RELEASED_INSTANCE.Descriptor.Code)
             {
-                outcome = o;
-                manualResetEvent.Set();
+                Error error = new Error(ErrorCode.MessageReleased);
+                tcs.TrySetException(ExceptionSupport.GetException(error, $"Message {message.Properties.GetMessageId()} released"));
+            }
+            else
+            {
+                Error error = new Error(ErrorCode.InternalError);
+                tcs.TrySetException(ExceptionSupport.GetException(error, outcome.ToString()));
             }
         }
 
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
index e908fcd..c31bdeb 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
@@ -266,13 +266,12 @@
 
         public INmsMessageFactory MessageFactory => connection.MessageFactory;
 
-        public Task Send(OutboundMessageDispatch envelope)
+        public async Task Send(OutboundMessageDispatch envelope)
         {
             AmqpSession session = connection.GetSession(envelope.ProducerInfo.SessionId);
             AmqpProducer producer = session.GetProducer(envelope.ProducerId);
-            producer.Send(envelope);
+            await producer.Send(envelope).ConfigureAwait(false);
             envelope.Message.IsReadOnly = false;
-            return Task.CompletedTask;
         }
 
         public Task Unsubscribe(string subscriptionName)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs
new file mode 100644
index 0000000..5679594
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs
@@ -0,0 +1,327 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.NMS.AMQP.Meta;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+    public class AmqpSubscriptionTracker
+    {
+
+        // Subscription Name Delimiter
+        public readonly static string SUB_NAME_DELIMITER = "|";
+
+        private readonly ISet<string> exclusiveDurableSubs = new HashSet<string>();
+
+        private readonly IDictionary<string, SubDetails> sharedDurableSubs =
+            new ConcurrentDictionary<string, SubDetails>();
+
+        private readonly IDictionary<string, SubDetails> sharedVolatileSubs =
+            new ConcurrentDictionary<string, SubDetails>();
+
+        public string ReserveNextSubscriptionLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+        {
+            ValidateSubscriptionName(subscriptionName);
+
+            if (consumerInfo == null)
+            {
+                throw new ArgumentException("Consumer info must not be null.");
+            }
+
+            if (consumerInfo.IsShared)
+            {
+                if (consumerInfo.IsDurable)
+                {
+                    return GetSharedDurableSubLinkName(subscriptionName, consumerInfo);
+                }
+                else
+                {
+                    return GetSharedVolatileSubLinkName(subscriptionName, consumerInfo);
+                }
+            }
+            else if (consumerInfo.IsDurable)
+            {
+                RegisterExclusiveDurableSub(subscriptionName);
+                return subscriptionName;
+            }
+            else
+            {
+                throw new IllegalStateException(
+                    "Non-shared non-durable sub link naming is not handled by the tracker.");
+            }
+        }
+
+        private void ValidateSubscriptionName(string subscriptionName)
+        {
+            if (string.IsNullOrEmpty(subscriptionName))
+            {
+                throw new ArgumentException("Subscription name must not be null or empty.");
+            }
+
+            if (subscriptionName.Contains(SUB_NAME_DELIMITER))
+            {
+                throw new ArgumentException(
+                    "Subscription name must not contain '" + SUB_NAME_DELIMITER + "' character.");
+            }
+        }
+
+        private string GetSharedDurableSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+        {
+            IDestination topic = consumerInfo.Destination;
+            string selector = consumerInfo.Selector;
+
+            SubDetails subDetails = null;
+            if (sharedDurableSubs.ContainsKey(subscriptionName))
+            {
+                subDetails = sharedDurableSubs[subscriptionName];
+
+                if (subDetails.Matches(topic, selector))
+                {
+                    subDetails.AddSubscriber(consumerInfo);
+                }
+                else
+                {
+                    throw new NMSException("Subscription details dont match existing subscriber.");
+                }
+            }
+            else
+            {
+                subDetails = new SubDetails(topic, selector, consumerInfo);
+            }
+
+            sharedDurableSubs.Add(subscriptionName, subDetails);
+
+            int count = subDetails.TotalSubscriberCount();
+
+            return GetDurableSubscriptionLinkName(subscriptionName, consumerInfo.IsExplicitClientId, count);
+        }
+
+        private string GetDurableSubscriptionLinkName(string subscriptionName, bool hasClientID, int count)
+        {
+            string linkName = GetFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
+            if (count > 1)
+            {
+                if (hasClientID)
+                {
+                    linkName += SUB_NAME_DELIMITER + count;
+                }
+                else
+                {
+                    linkName += count;
+                }
+            }
+
+            return linkName;
+        }
+
+        public string GetFirstDurableSubscriptionLinkName(string subscriptionName, bool hasClientID)
+        {
+            ValidateSubscriptionName(subscriptionName);
+
+            String receiverLinkName = subscriptionName;
+            if (!hasClientID)
+            {
+                receiverLinkName += SUB_NAME_DELIMITER + "global";
+            }
+
+            return receiverLinkName;
+        }
+
+        private String GetSharedVolatileSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+        {
+            IDestination topic = consumerInfo.Destination;
+            string selector = consumerInfo.Selector;
+
+            SubDetails subDetails = null;
+            if (sharedVolatileSubs.ContainsKey(subscriptionName))
+            {
+                subDetails = sharedVolatileSubs[subscriptionName];
+
+                if (subDetails.Matches(topic, selector))
+                {
+                    subDetails.AddSubscriber(consumerInfo);
+                }
+                else
+                {
+                    throw new NMSException("Subscription details dont match existing subscriber");
+                }
+            }
+            else
+            {
+                subDetails = new SubDetails(topic, selector, consumerInfo);
+            }
+
+            sharedVolatileSubs.Add(subscriptionName, subDetails);
+
+            string receiverLinkName = subscriptionName + SUB_NAME_DELIMITER;
+            int count = subDetails.TotalSubscriberCount();
+
+            if (consumerInfo.IsExplicitClientId)
+            {
+                receiverLinkName += "volatile" + count;
+            }
+            else
+            {
+                receiverLinkName += "global-volatile" + count;
+            }
+
+            return receiverLinkName;
+        }
+
+        private void RegisterExclusiveDurableSub(String subscriptionName)
+        {
+            exclusiveDurableSubs.Add(subscriptionName);
+        }
+
+        /**
+         * Checks if there is an exclusive durable subscription already
+         * recorded as active with the given subscription name.
+         *
+         * @param subscriptionName name of subscription to check
+         * @return true if there is an exclusive durable sub with this name already active
+         */
+        public bool IsActiveExclusiveDurableSub(String subscriptionName)
+        {
+            return exclusiveDurableSubs.Contains(subscriptionName);
+        }
+
+        /**
+         * Checks if there is a shared durable subscription already
+         * recorded as active with the given subscription name.
+         *
+         * @param subscriptionName name of subscription to check
+         * @return true if there is a shared durable sub with this name already active
+         */
+        public bool IsActiveSharedDurableSub(string subscriptionName)
+        {
+            return sharedDurableSubs.ContainsKey(subscriptionName);
+        }
+
+        /**
+         * Checks if there is either a shared or exclusive durable subscription
+         * already recorded as active with the given subscription name.
+         *
+         * @param subscriptionName name of subscription to check
+         * @return true if there is a durable sub with this name already active
+         */
+        public bool IsActiveDurableSub(string subscriptionName)
+        {
+            return IsActiveExclusiveDurableSub(subscriptionName) || IsActiveSharedDurableSub(subscriptionName);
+        }
+
+        /**
+         * Checks if there is an shared volatile subscription already
+         * recorded as active with the given subscription name.
+         *
+         * @param subscriptionName name of subscription to check
+         * @return true if there is a shared volatile sub with this name already active
+         */
+        public bool IsActiveSharedVolatileSub(String subscriptionName)
+        {
+            return sharedVolatileSubs.ContainsKey(subscriptionName);
+        }
+
+        public void ConsumerRemoved(NmsConsumerInfo consumerInfo)
+        {
+            string subscriptionName = consumerInfo.SubscriptionName;
+
+            if (!string.IsNullOrEmpty(subscriptionName))
+            {
+                if (consumerInfo.IsShared)
+                {
+                    if (consumerInfo.IsDurable)
+                    {
+                        if (sharedDurableSubs.ContainsKey(subscriptionName))
+                        {
+                            SubDetails subDetails = sharedDurableSubs[subscriptionName];
+                            subDetails.RemoveSubscriber(consumerInfo);
+
+                            int count = subDetails.ActiveSubscribers();
+                            if (count < 1)
+                            {
+                                sharedDurableSubs.Remove(subscriptionName);
+                            }
+                        }
+                    }
+                    else
+                    {
+                        if (sharedVolatileSubs.ContainsKey(subscriptionName))
+                        {
+                            SubDetails subDetails = sharedVolatileSubs[subscriptionName];
+                            subDetails.RemoveSubscriber(consumerInfo);
+
+                            int count = subDetails.ActiveSubscribers();
+                            if (count < 1)
+                            {
+                                sharedVolatileSubs.Remove(subscriptionName);
+                            }
+                        }
+                    }
+                }
+                else if (consumerInfo.IsDurable)
+                {
+                    exclusiveDurableSubs.Remove(subscriptionName);
+                }
+            }
+        }
+
+        private class SubDetails
+        {
+            private IDestination topic = null;
+            private String selector = null;
+            private ISet<NmsConsumerInfo> subscribers = new HashSet<NmsConsumerInfo>();
+            private int totalSubscriberCount;
+
+            public SubDetails(IDestination topic, string selector, NmsConsumerInfo info)
+            {
+                this.topic = topic ?? throw new ArgumentException("Topic destination must not be null");
+                this.selector = selector;
+                AddSubscriber(info);
+            }
+
+            public void AddSubscriber(NmsConsumerInfo info)
+            {
+                if (info == null)
+                {
+                    throw new ArgumentException("Consumer info must not be null");
+                }
+
+                totalSubscriberCount++;
+                subscribers.Add(info);
+            }
+
+            public void RemoveSubscriber(NmsConsumerInfo info)
+            {
+                subscribers.Remove(info);
+            }
+
+            public int ActiveSubscribers()
+            {
+                return subscribers.Count;
+            }
+
+            public int TotalSubscriberCount()
+            {
+                return totalSubscriberCount;
+            }
+
+            public bool Matches(IDestination newTopic, string newSelector)
+            {
+                if (!topic.Equals(newTopic))
+                {
+                    return false;
+                }
+
+                if (selector == null)
+                {
+                    return newSelector == null;
+                }
+                else
+                {
+                    return selector.Equals(newSelector);
+                }
+            }
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
index 4185271..e61ddfb 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
@@ -163,7 +163,7 @@
             Message.BodySection = EMPTY_DATA;
         }
 
-        public virtual bool HasBody()
+        public override bool HasBody()
         {
             if (byteOut != null)
                 return byteOut.BaseStream.Length > 0;
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
index 7a588de..ae09084 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
@@ -67,7 +67,7 @@
             }
         }
 
-        public virtual bool HasBody()
+        public override bool HasBody()
         {
             return Map.Count > 0;
         }
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
index a6f53a7..4799ada 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Runtime.CompilerServices;
 using System.Text;
 using Amqp;
 using Amqp.Framing;
@@ -37,6 +38,7 @@
         private IDestination consumerDestination;
         private IAmqpConnection connection;
         private DateTime? syntheticExpiration;
+        private DateTime syntheticDeliveryTime;
         public global::Amqp.Message Message { get; private set; }
 
         public int RedeliveryCount
@@ -254,6 +256,32 @@
             }
         }
 
+        public DateTime DeliveryTime
+        {
+            get
+            {
+                object deliveryTime = GetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME);
+                switch (deliveryTime)
+                {
+                    case DateTime time:
+                        return time;
+                    case long _:
+                    case ulong _:
+                    case int _:
+                    case uint _:
+                        return new DateTime(621355968000000000L + Convert.ToInt64(deliveryTime) * 10000L, DateTimeKind.Utc);
+                    default:
+                        return syntheticDeliveryTime;
+                }
+            }
+            set
+            {
+                // Assumption that if it is being set through property, then it is with purpose of send out this value 
+                syntheticDeliveryTime = value;
+                SetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME, new DateTimeOffset(value).ToUnixTimeMilliseconds());
+            }
+        }
+
         public Header Header => Message.Header;
 
         public string GroupId
@@ -395,6 +423,12 @@
             {
                 syntheticExpiration = DateTime.UtcNow + ttl;
             }
+
+            if (GetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME) == null)
+            {
+                syntheticDeliveryTime = DateTime.UtcNow;
+            }
+            
         }
 
         protected virtual void InitializeBody()
@@ -445,11 +479,17 @@
             return copy;
         }
 
+        public virtual bool HasBody()
+        {
+            return false;
+        }
+
         protected void CopyInto(AmqpNmsMessageFacade target)
         {
             target.connection = connection;
             target.consumerDestination = consumerDestination;
             target.syntheticExpiration = syntheticExpiration;
+            target.syntheticDeliveryTime = syntheticDeliveryTime;
             target.amqpTimeToLiveOverride = amqpTimeToLiveOverride;
             target.destination = destination;
             target.replyTo = replyTo;
@@ -470,11 +510,18 @@
             return MessageAnnotations != null && MessageAnnotations.Map.ContainsKey(annotationName);
         }
 
-        public void SetMessageAnnotation(Symbol symbolKeyName, string value)
+        public void SetMessageAnnotation(Symbol symbolKeyName, object value)
         {
             LazyCreateMessageAnnotations();
             MessageAnnotations.Map.Add(symbolKeyName, value);
         }
+        
+        
+        public void RemoveMessageAnnotation(Symbol symbolKeyName)
+        {
+            if (Message.MessageAnnotations == null) return;
+            MessageAnnotations.Map.Remove(symbolKeyName);
+        }
 
         private void LazyCreateMessageAnnotations()
         {
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsObjectMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsObjectMessageFacade.cs
index 3e64d71..d8454fa 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsObjectMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsObjectMessageFacade.cs
@@ -29,7 +29,7 @@
 
         public IAmqpObjectTypeDelegate Delegate => typeDelegate;
 
-        public object Body
+        public object Object
         {
             get => Delegate.Object;
             set => Delegate.Object = value;
@@ -47,7 +47,7 @@
         {
             try
             {
-                Body = null;
+                Object = null;
             }
             catch (IOException e)
             {
@@ -91,5 +91,10 @@
             copy.typeDelegate = typeDelegate;
             return copy;
         }
+
+        public override bool HasBody()
+        {
+            return Object != null;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
index a7f63c9..18fc051 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
@@ -138,7 +138,7 @@
             return emptyList;
         }
 
-        public virtual bool HasBody() => !IsEmpty;
+        public override bool HasBody() => !IsEmpty;
 
         public override void ClearBody()
         {
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
index 371efcd..dd3cf81 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
@@ -71,7 +71,7 @@
             SetTextBody(null);
         }
 
-        public virtual bool HasBody()
+        public override bool HasBody()
         {
             try
             {
diff --git a/src/NMS.AMQP/Util/AtomicLong.cs b/src/NMS.AMQP/Util/AtomicLong.cs
index d43fae2..207179b 100644
--- a/src/NMS.AMQP/Util/AtomicLong.cs
+++ b/src/NMS.AMQP/Util/AtomicLong.cs
@@ -32,6 +32,16 @@
         {
             return Interlocked.Increment(ref value);
         }
+        
+        public long DecrementAndGet()
+        {
+            return Interlocked.Decrement(ref value);
+        }
+        
+        public long Get()
+        {
+            return Interlocked.Read(ref value);
+        }
 
         public static implicit operator long(AtomicLong atomicLong)
         {
diff --git a/src/NMS.AMQP/Util/SymbolUtil.cs b/src/NMS.AMQP/Util/SymbolUtil.cs
index 807bc7f..e3fe2c3 100644
--- a/src/NMS.AMQP/Util/SymbolUtil.cs
+++ b/src/NMS.AMQP/Util/SymbolUtil.cs
@@ -39,6 +39,7 @@
         public static readonly Symbol OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER = new Symbol("sole-connection-for-container");
         public static readonly Symbol OPEN_CAPABILITY_ANONYMOUS_RELAY = new Symbol("ANONYMOUS-RELAY");
         public static readonly Symbol OPEN_CAPABILITY_DELAYED_DELIVERY = new Symbol("DELAYED_DELIVERY");
+        public static readonly Symbol OPEN_CAPABILITY_SHARED_SUBS = new Symbol("SHARED-SUBS");
 
         // Attach Frame 
         public readonly static Symbol ATTACH_EXPIRY_POLICY_LINK_DETACH = new Symbol("link-detach");
@@ -62,6 +63,8 @@
         public static readonly Symbol JMSX_OPT_DEST = new Symbol("x-opt-jms-dest");
         public static readonly Symbol JMSX_OPT_REPLY_TO = new Symbol("x-opt-jms-reply-to");
 
+        public static readonly Symbol NMS_DELIVERY_TIME = new Symbol("x-opt-delivery-time");
+
         // Frame Property Value
         public readonly static Symbol BOOLEAN_TRUE = new Symbol("true");
         public readonly static Symbol BOOLEAN_FALSE = new Symbol("false");
@@ -122,6 +125,21 @@
             // unknown destination type...
             return null;
         }
+        
+        public static bool IsNumber(object value)
+        {
+            return value is sbyte
+                   || value is byte
+                   || value is short
+                   || value is ushort
+                   || value is int
+                   || value is uint
+                   || value is long
+                   || value is ulong
+                   || value is float
+                   || value is double
+                   || value is decimal;
+        }
 
     }
 }
diff --git a/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs b/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
index 94b8723..40b2453 100644
--- a/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
+++ b/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
@@ -95,9 +95,9 @@
             }
         }
 
-        protected override object GetObjectProperty(string key) => properties[key];
+        public override object GetObject(string key) => properties[key];
 
-        protected override void SetObjectProperty(string key, object value)
+        public override void SetObject(string key, object value)
         {
             object objval = value;
 
diff --git a/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPValueMap.cs b/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPValueMap.cs
index c527e1f..8826461 100644
--- a/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPValueMap.cs
+++ b/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPValueMap.cs
@@ -102,7 +102,7 @@
         /// </summary>
         /// <param name="key">Key to associated value.</param>
         /// <returns>Value for given Key.</returns>
-        protected override object GetObjectProperty(string key)
+        public override object GetObject(string key)
         {
             return this.value[key];
         }
@@ -112,7 +112,7 @@
         /// </summary>
         /// <param name="key">Key to associated value.</param>
         /// <param name="value">Value to set.</param>
-        protected override void SetObjectProperty(string key, object value)
+        public override void SetObject(string key, object value)
         {
             object objval = value;
             if(objval is IDictionary)
diff --git a/src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs b/src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs
index bc1943e..d51f5fb 100644
--- a/src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs
+++ b/src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs
@@ -52,25 +52,25 @@
 
         public object this[string key]
         {
-            get { return GetObjectProperty(key); }
+            get { return GetObject(key); }
 
             set
             {
                 CheckValidType(value);
-                SetObjectProperty(key, value);
+                SetObject(key, value);
             }
         }
 
         public bool GetBool(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(bool));
             return (bool) value;
         }
 
         public byte GetByte(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(byte));
             return (byte) value;
         }
@@ -79,7 +79,7 @@
 
         public char GetChar(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(char));
             return (char) value;
         }
@@ -88,21 +88,21 @@
 
         public double GetDouble(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(double));
             return (double) value;
         }
 
         public float GetFloat(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(float));
             return (float) value;
         }
 
         public int GetInt(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(int));
             return (int) value;
         }
@@ -111,7 +111,7 @@
 
         private T GetComplexType<T>(string key) where T : class
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             if (value is null)
                 return null;
             if (value is T complexValue)
@@ -122,90 +122,90 @@
 
         public long GetLong(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(long));
             return (long) value;
         }
 
         public short GetShort(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(short));
             return (short) value;
         }
 
         public string GetString(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(string));
             return (string) value;
         }
 
         public void SetBool(string key, bool value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetByte(string key, byte value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetBytes(string key, byte[] value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetBytes(string key, byte[] value, int offset, int length)
         {
             byte[] copy = new byte[length];
             Array.Copy(value, offset, copy, 0, length);
-            SetObjectProperty(key, copy);
+            SetObject(key, copy);
         }
 
         public void SetChar(string key, char value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetDictionary(string key, IDictionary dictionary)
         {
-            SetObjectProperty(key, dictionary);
+            SetObject(key, dictionary);
         }
 
         public void SetDouble(string key, double value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetFloat(string key, float value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetInt(string key, int value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetList(string key, IList list)
         {
-            SetObjectProperty(key, list);
+            SetObject(key, list);
         }
 
         public void SetLong(string key, long value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetShort(string key, short value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetString(string key, string value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         #endregion
@@ -213,8 +213,8 @@
         #region Protected Abstract Methods
 
         internal abstract object SyncRoot { get; }
-        protected abstract object GetObjectProperty(string key);
-        protected abstract void SetObjectProperty(string key, object value);
+        public abstract object GetObject(string key);
+        public abstract void SetObject(string key, object value);
 
         #endregion
 
@@ -262,7 +262,7 @@
                     }
 
                     first = false;
-                    object value = GetObjectProperty(key);
+                    object value = GetObject(key);
                     result = key + "=" + value;
                 }
             }
diff --git a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
index 2155d86..bf309c4 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
@@ -18,6 +18,7 @@
 using System;
 using Apache.NMS;
 using Apache.NMS.AMQP;
+using NMS.AMQP.Test.TestAmqp;
 using NUnit.Framework;
 
 namespace NMS.AMQP.Test
@@ -30,19 +31,32 @@
 
         protected string TestName => TestContext.CurrentContext.Test.Name;
 
+        static AmqpTestSupport()
+        {
+            Tracer.Trace = new NLogAdapter();
+        }
+        
         [TearDown]
         public void TearDown()
         {
             Connection?.Close();
         }
 
-        protected IConnection CreateAmqpConnection()
+        protected IConnection CreateAmqpConnectionStarted(string clientId = null)
+        {
+            var connection = CreateAmqpConnection(clientId);
+            connection.Start();
+            return connection;
+        }
+        
+        protected IConnection CreateAmqpConnection(string clientId = null)
         {
             string brokerUri = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_URI") ?? "amqp://127.0.0.1:5672";
             string userName = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CU") ?? "admin";
             string password = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CPWD") ?? "admin";
 
             NmsConnectionFactory factory = new NmsConnectionFactory(brokerUri);
+            factory.ClientId = clientId;
             return factory.CreateConnection(userName, password);
         }
 
@@ -108,11 +122,7 @@
             IQueue queue = session.GetQueue(TestName);
             IMessageConsumer consumer = session.CreateConsumer(queue);
 
-            IMessage message;
-            do
-            {
-                message = consumer.Receive(timeout);
-            } while (message != null);
+            PurgeConsumer(consumer, timeout);
 
             amqpConnection.Close();
         }
@@ -125,13 +135,18 @@
             ITopic queue = session.GetTopic(TestName);
             IMessageConsumer consumer = session.CreateConsumer(queue);
 
+            PurgeConsumer(consumer, timeout);
+
+            amqpConnection.Close();
+        }
+
+        protected void PurgeConsumer(IMessageConsumer consumer, TimeSpan timeout)
+        {
             IMessage message;
             do
             {
                 message = consumer.Receive(timeout);
             } while (message != null);
-
-            amqpConnection.Close();
         }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj b/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
index f7ce890..e167532 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
+++ b/test/Apache-NMS-AMQP-Interop-Test/Apache-NMS-AMQP-Interop-Test.csproj
@@ -16,11 +16,11 @@
 -->
 <Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
-    <TargetFrameworks>net462;netcoreapp2.2</TargetFrameworks>
+    <TargetFrameworks>net462;netcoreapp3.1</TargetFrameworks>
     <TargetFramework Condition="'$(AppTargetFramework)' != ''">$(AppTargetFramework)</TargetFramework>
     <RootNamespace>NMS.AMQP.Test</RootNamespace>
     <AssemblyName>NMS.AMQP.Interop.Test</AssemblyName>
-    <LangVersion>7.3</LangVersion>
+    <LangVersion>8</LangVersion>
   </PropertyGroup>
   
   <ItemGroup>
@@ -33,5 +33,12 @@
   
   <ItemGroup>
     <ProjectReference Include="..\..\src\NMS.AMQP\Apache-NMS-AMQP.csproj" />
+    <ProjectReference Include="..\Apache-NMS-AMQP-Test\Apache-NMS-AMQP-Test.csproj" />
+  </ItemGroup>
+  
+  <ItemGroup>
+    <None Update="NLog.config">
+      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+    </None>
   </ItemGroup>
 </Project>
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NLog.config b/test/Apache-NMS-AMQP-Interop-Test/NLog.config
new file mode 100644
index 0000000..c0f1581
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NLog.config
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+    <targets>
+        <target name="logconsole" xsi:type="Console" layout="${time} | ${level} | ${callsite:includeNamespace=false:methodName=false} | ${message}" />
+    </targets>
+
+    <rules>
+        <logger name="*" minlevel="Debug" writeTo="logconsole" />
+    </rules>
+</nlog>
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
index 4529783..9f174d4 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -16,6 +16,11 @@
  */
 
 using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
 using Apache.NMS;
 using NUnit.Framework;
 
@@ -84,6 +89,223 @@
             Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
         }
 
+
+        [Test, Timeout(60_000)]
+        public void TestDurableSubscription()
+        {
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            int counter = 0;
+
+
+            using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+            string subscriptionName = "mySubscriptionName";
+            ITopic topicProducer = sessionProducer.GetTopic(TestName);
+            using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+
+            // First durable consumer, reads message but does not unsubscribe
+            using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+            using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            {
+                using ITopic topic = session.GetTopic(TestName);
+                using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+                {
+                    // Purge topic
+                    PurgeConsumer(messageConsumer, TimeSpan.FromSeconds(0.5));
+
+                    ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+                    producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+                    var message = messageConsumer.Receive();
+                    Assert.AreEqual("text0", message.Body<string>());
+                }
+            }
+
+            // Write some more messages while subscription is closed
+            for (int t = 0; t < 3; t++)
+            {
+                ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+                producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+            }
+
+            // Second durable consumer, reads message that were send during no-subscription and unsubscribe
+            using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+            using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            {
+                using ITopic topic = session.GetTopic(TestName);
+                using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+                {
+                    for (int t = 1; t <= 3; t++)
+                    {
+                        var message = messageConsumer.Receive();
+                        Assert.AreEqual("text" + t, message.Body<string>());
+                    }
+
+                    // Assert topic is empty after those msgs
+                    var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
+                    Assert.IsNull(msgAtTheEnd);
+
+                    Assert.Throws<IllegalStateException>(() => session.Unsubscribe(subscriptionName)); // Error unsubscribing while consumer is on
+                }
+
+                session.Unsubscribe(subscriptionName);
+            }
+
+
+            // Send some messages again to verify we will not get them when create durable subscription
+            for (int t = 0; t < 3; t++)
+            {
+                ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + (counter++));
+                producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+            }
+
+            // Third durable subscriber, expect NOT to read messages during no-subscription period
+            using (var connectionSubscriber = CreateAmqpConnectionStarted("CLIENT1"))
+            using (ISession session = connectionSubscriber.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            {
+                using ITopic topic = session.GetTopic(TestName);
+                using (IMessageConsumer messageConsumer = session.CreateDurableConsumer(topic, subscriptionName, null))
+                {
+                    // Assert topic is empty 
+                    var msgAtTheEnd = messageConsumer.Receive(TimeSpan.FromSeconds(1));
+                    Assert.IsNull(msgAtTheEnd);
+                }
+
+                // And unsubscribe again
+                session.Unsubscribe(subscriptionName);
+            }
+        }
+
+
+        [Test, Timeout(60_000)]
+        public void TestSharedSubscription()
+        {
+            IMessageConsumer GetConsumer(string subscriptionName, String clientId)
+            {
+                var connection = CreateAmqpConnectionStarted(clientId);
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                var topic = session.GetTopic(TestName);
+                var messageConsumer = session.CreateSharedConsumer(topic, subscriptionName);
+                return messageConsumer;
+            }
+
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            string subscriptionName = "mySubscriptionName";
+            
+
+            var receivedMessages = new List<int>();
+
+            var messageConsumer1 = GetConsumer(subscriptionName, null);
+            var messageConsumer2 = GetConsumer(subscriptionName, null);
+            messageConsumer1.Listener += (msg) =>
+            {
+                receivedMessages.Add(1);
+                msg.Acknowledge();
+            };
+            messageConsumer2.Listener += (msg) =>
+            {
+                receivedMessages.Add(2);
+                msg.Acknowledge();
+            };
+            
+            // Now send some messages
+            using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITopic topicProducer = sessionProducer.GetTopic(TestName);
+            using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+            for (int t = 0; t < 10; t++)
+            {
+                ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
+                producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+            }
+
+            // Give it some time to process
+            Thread.Sleep(TimeSpan.FromSeconds(2));
+            
+            // Assert message was routed to multiple consumers
+            Assert.AreEqual(2, receivedMessages.Distinct().Count());
+            Assert.AreEqual(10, receivedMessages.Count);
+        }
+        
+        [Test, Timeout(60_000)]
+        public void TestSharedDurableSubscription()
+        {
+            (IMessageConsumer,ISession,IConnection) GetConsumer(string subscriptionName, String clientId)
+            {
+                var connection = CreateAmqpConnection(clientId);
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                var topic = session.GetTopic(TestName);
+                var messageConsumer = session.CreateSharedDurableConsumer(topic, subscriptionName);
+                return (messageConsumer, session, connection);
+            }
+
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            string subscriptionName = "mySubscriptionName";
+            int messageSendCount = 1099;   
+
+            var receivedMessages = new ConcurrentBag<int>();
+
+            
+            IConnection connectionConsumer1, connectionConsumer2;
+            IMessageConsumer messageConsumer1, messageConsumer2;
+            
+            (messageConsumer1, _, connectionConsumer1) = GetConsumer(subscriptionName, null);
+            (messageConsumer2, _, connectionConsumer2) = GetConsumer(subscriptionName, null);
+            connectionConsumer1.Start();
+            connectionConsumer2.Start();
+            
+            messageConsumer1.Close();
+            messageConsumer2.Close();
+            connectionConsumer1.Close();
+            connectionConsumer2.Close();
+            
+            // Now send some messages
+            using ISession sessionProducer = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITopic topicProducer = sessionProducer.GetTopic(TestName);
+            using IMessageProducer producer = sessionProducer.CreateProducer(topicProducer);
+            for (int t = 0; t < messageSendCount; t++)
+            {
+                ITextMessage producerMessage = sessionProducer.CreateTextMessage("text" + t);
+                producer.Send(producerMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+            }
+
+            // Create consumers again and expect messages to be delivered to them
+            ISession sessionConsumer1, sessionConsumer2;
+            (messageConsumer1, sessionConsumer1, connectionConsumer1) = GetConsumer(subscriptionName, null);
+            (messageConsumer2, sessionConsumer2, connectionConsumer2) = GetConsumer(subscriptionName, null);
+            messageConsumer1.Listener += (msg) =>
+            {
+                receivedMessages.Add(1);
+                msg.Acknowledge();
+            };
+            messageConsumer2.Listener += (msg) =>
+            {
+                receivedMessages.Add(2);
+                msg.Acknowledge();
+            };
+            Task.Run(() => connectionConsumer1.Start()); // parallel to give both consumers chance to start at the same time
+            Task.Run(() => connectionConsumer2.Start());
+            
+            // Give it some time to process
+            Thread.Sleep(TimeSpan.FromSeconds(5));
+            
+            // Assert message was routed to multiple consumers
+            Assert.AreEqual(2, receivedMessages.Distinct().Count());
+            Assert.AreEqual(messageSendCount, receivedMessages.Count);
+            
+            messageConsumer1.Close();
+            messageConsumer2.Close();
+            sessionConsumer1.Unsubscribe(subscriptionName);
+            sessionConsumer2.Unsubscribe(subscriptionName);
+            
+        }
+
+
         [Test, Timeout(60_000)]
         public void TestSelectNoLocal()
         {
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
new file mode 100644
index 0000000..5bd182d
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageProducerTest.cs
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.NMS;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test
+{
+    [TestFixture]
+    public class NmsMessageProducerTest : AmqpTestSupport
+    {
+        [Test, Timeout(60_000)]
+        public void TestDeliveryDelay()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+
+            var deliveryDelay = TimeSpan.FromSeconds(7);
+            
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            ISession session = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.DeliveryDelay = deliveryDelay;
+
+            DateTime? receivingTime = null;
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            var receivingTask = Task.Run(() =>
+            {
+                while (true)
+                {
+                    var message = consumer.Receive(TimeSpan.FromMilliseconds(100));
+                    if (message != null && message.Body<string>() == "Hello")
+                    {
+                        receivingTime = DateTime.Now;
+                        return;
+                    }
+                }
+            });
+            
+            
+            DateTime sendTime = DateTime.Now;
+            ITextMessage message = session.CreateTextMessage("Hello");
+            producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+            // Wait that delivery delay
+            Thread.Sleep(deliveryDelay);
+
+            receivingTask.Wait(TimeSpan.FromSeconds(20)); // make sure its done
+
+            var measuredDelay = (receivingTime.Value - sendTime);
+            
+            Assert.Greater(measuredDelay.TotalMilliseconds, deliveryDelay.TotalMilliseconds* 0.5);
+            Assert.Less(measuredDelay.TotalMilliseconds, deliveryDelay.TotalMilliseconds*1.5);
+        }
+
+      
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj b/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
index 68c6937..2de2f86 100644
--- a/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
+++ b/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
@@ -16,7 +16,7 @@
 -->
 <Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
-    <TargetFrameworks>net462;netcoreapp2.2</TargetFrameworks>
+    <TargetFrameworks>net462;netcoreapp3.1</TargetFrameworks>
     <TargetFramework Condition="'$(AppTargetFramework)' != ''">$(AppTargetFramework)</TargetFramework>
     <RootNamespace>NMS.AMQP.Test</RootNamespace>
     <AssemblyName>NMS.AMQP.Test</AssemblyName>
diff --git a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
index 9a13133..fac9499 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
@@ -50,6 +50,26 @@
             return connection;
         }
 
+        protected INMSContext EstablishNMSContext(TestAmqpPeer testPeer, string optionsString = null, Symbol[] serverCapabilities = null, Fields serverProperties = null, bool setClientId = true, AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge)
+        {
+            testPeer.ExpectSaslPlain("guest", "guest");
+            testPeer.ExpectOpen(serverCapabilities: serverCapabilities, serverProperties: serverProperties);
+
+            // Each connection creates a session for managing temporary destinations etc.
+            testPeer.ExpectBegin();
+
+            var remoteUri = BuildUri(testPeer, optionsString);
+            var connectionFactory = new NmsConnectionFactory(remoteUri);
+            var context = connectionFactory.CreateContext("guest", "guest", acknowledgementMode);
+            if (setClientId)
+            {
+                // Set a clientId to provoke the actual AMQP connection process to occur.
+                context.ClientId = "ClientName";
+            }
+            
+            return context;
+        }
+        
         private static string BuildUri(TestAmqpPeer testPeer, string optionsString)
         {
             string baseUri = "amqp://127.0.0.1:" + testPeer.ServerPort.ToString();
diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs
new file mode 100644
index 0000000..dbe936a
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp.Framing;
+using Amqp.Types;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    [TestFixture]
+    public class MessageDeliveryTimeTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20000)]
+        public void TestReceiveMessageWithoutDeliveryTimeSet()
+        {
+            DoReceiveMessageDeliveryTime(null, null);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsDateTime()
+        {
+            DateTime deliveryTime = DateTimeOffset.FromUnixTimeMilliseconds(CurrentTimeInMillis() + 12345).DateTime.ToUniversalTime();
+            DoReceiveMessageDeliveryTime(deliveryTime, deliveryTime);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsULong()
+        {
+            ulong deliveryTime = (ulong) (CurrentTimeInMillis() + 12345);
+            DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds((long) deliveryTime).DateTime);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsLong()
+        {
+            long deliveryTime = (CurrentTimeInMillis() + 12345);
+            DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsInt()
+        {
+            int deliveryTime = (int) (CurrentTimeInMillis() + 12345);
+            DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+        }
+
+        [Test, Timeout(20000)]
+        public void TestDeliveryTimeIsUInt()
+        {
+            uint deliveryTime = (uint) (CurrentTimeInMillis() + 12345);
+            DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+        }
+
+        private long CurrentTimeInMillis()
+        {
+            return new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
+        }
+
+        private void DoReceiveMessageDeliveryTime(object setDeliveryTimeAnnotation, DateTime? expectedDeliveryTime)
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer, "amqp.traceFrames=true");
+                connection.Start();
+                testPeer.ExpectBegin();
+                var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                var message = CreateMessageWithNullContent();
+                if (setDeliveryTimeAnnotation != null)
+                {
+                    message.MessageAnnotations = message.MessageAnnotations ?? new MessageAnnotations();
+                    message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME] = setDeliveryTimeAnnotation;
+                }
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message);
+                testPeer.ExpectDisposition(true, (deliveryState) => { });
+
+                DateTime startingTimeFrom = DateTime.UtcNow;
+                var messageConsumer = session.CreateConsumer(queue);
+                var receivedMessage = messageConsumer.Receive(TimeSpan.FromMilliseconds(3000));
+                DateTime receivingTime = DateTime.UtcNow;
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+
+                Assert.IsNotNull(receivedMessage);
+                if (expectedDeliveryTime != null)
+                {
+                    Assert.AreEqual(receivedMessage.NMSDeliveryTime, expectedDeliveryTime.Value);
+                }
+                else
+                {
+                    Assert.LessOrEqual(receivedMessage.NMSDeliveryTime, receivingTime);
+                    Assert.GreaterOrEqual(receivedMessage.NMSDeliveryTime, startingTimeFrom);
+                }
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestDeliveryDelayNotSupportedThrowsException()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = base.EstablishConnection(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+                Assert.Throws<NotSupportedException>(() => producer.DeliveryDelay = TimeSpan.FromMinutes(17));
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestDeliveryDelayHasItsReflectionInAmqpAnnotations()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                // Determine current time
+                TimeSpan deliveryDelay = TimeSpan.FromMinutes(17);
+                long currentUnixEpochTime = new DateTimeOffset(DateTime.UtcNow + deliveryDelay).ToUnixTimeMilliseconds();
+                long currentUnixEpochTime2 = new DateTimeOffset(DateTime.UtcNow + deliveryDelay + deliveryDelay).ToUnixTimeMilliseconds();
+
+                IConnection connection = base.EstablishConnection(testPeer,
+                    serverCapabilities: new Symbol[] {SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY, SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER});
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.DeliveryDelay = deliveryDelay;
+
+                // Create and transfer a new message
+                testPeer.ExpectTransfer(message =>
+                {
+                    Assert.GreaterOrEqual((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime);
+                    Assert.Less((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime2);
+
+                    Assert.IsTrue(message.Header.Durable);
+                });
+                testPeer.ExpectClose();
+
+                ITextMessage textMessage = session.CreateTextMessage();
+
+                producer.Send(textMessage);
+                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
new file mode 100644
index 0000000..c1bb33b
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
@@ -0,0 +1,975 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Amqp.Framing;
+using Apache.NMS;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    // Adapted from ConsumerIntegrationTest to use NMSContext
+    [TestFixture]
+    public class NMSConsumerIntegrationTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public void TestCloseConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var consumer = context.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        // TODO No connection Listener in context
+        // [Test, Timeout(20_000)]
+        // public void TestRemotelyCloseConsumer()
+        // {
+        //     Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+        //     string errorMessage = "buba";
+        //
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         ManualResetEvent consumerClosed = new ManualResetEvent(false);
+        //         ManualResetEvent exceptionFired = new ManualResetEvent(false);
+        //
+        //         mockConnectionListener
+        //             .Setup(listener => listener.OnConsumerClosed(It.IsAny<IMessageConsumer>(), It.IsAny<Exception>()))
+        //             .Callback(() => consumerClosed.Set());
+        //
+        //         var context = (NmsContext) EstablishNMSContext(testPeer, "amqp.traceFrames=true");
+        //         context.ConnectionInterruptedListener += () => { consumerClosed.Set(); };// AddConnectionListener(mockConnectionListener.Object);}
+        //         // context.list ConnectionInterruptedListener += () => { consumerClosed.Set(); };// AddConnectionListener(mockConnectionListener.Object);}
+        //         context.ExceptionListener += exception => { exceptionFired.Set(); };
+        //
+        //         testPeer.ExpectBegin();
+        //         // ISession session = context.CreateSession(AcknowledgementMode.AutoAcknowledge);
+        //
+        //         // Create a consumer, then remotely end it afterwards.
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlow();
+        //         testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, errorMessage: errorMessage, delayBeforeSend: 400);
+        //
+        //         IQueue queue = context.GetQueue("myQueue");
+        //         var consumer = context.CreateConsumer(queue);
+        //         
+        //         
+        //         // Verify the consumer gets marked closed
+        //         testPeer.WaitForAllMatchersToComplete(1000);
+        //
+        //         Assert.True(consumerClosed.WaitOne(2000), "Consumer closed callback didn't trigger");
+        //         Assert.False(exceptionFired.WaitOne(20), "Exception listener shouldn't fire with no MessageListener");
+        //
+        //         // Try closing it explicitly, should effectively no-op in client.
+        //         // The test peer will throw during close if it sends anything.
+        //         consumer.Close();
+        //     }
+        // }
+
+        // [Test, Timeout(20_000)]
+        // public void TestRemotelyCloseConsumerWithMessageListenerFiresExceptionListener()
+        // {
+        //     Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+        //     string errorMessage = "buba";
+        //
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         ManualResetEvent consumerClosed = new ManualResetEvent(false);
+        //         ManualResetEvent exceptionFired = new ManualResetEvent(false);
+        //
+        //         mockConnectionListener
+        //             .Setup(listener => listener.OnConsumerClosed(It.IsAny<IMessageConsumer>(), It.IsAny<Exception>()))
+        //             .Callback(() => consumerClosed.Set());
+        //
+        //         NmsConnection connection = (NmsConnection) EstablishConnection(testPeer);
+        //         connection.AddConnectionListener(mockConnectionListener.Object);
+        //         connection.ExceptionListener += exception => { exceptionFired.Set(); };
+        //
+        //         testPeer.ExpectBegin();
+        //         ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+        //
+        //         // Create a consumer, then remotely end it afterwards.
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlow();
+        //         testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, errorMessage: errorMessage, 10);
+        //
+        //         IQueue queue = session.GetQueue("myQueue");
+        //         IMessageConsumer consumer = session.CreateConsumer(queue);
+        //
+        //         consumer.Listener += message => { };
+        //
+        //         // Verify the consumer gets marked closed
+        //         testPeer.WaitForAllMatchersToComplete(1000);
+        //
+        //         Assert.True(consumerClosed.WaitOne(2000), "Consumer closed callback didn't trigger");
+        //         Assert.True(exceptionFired.WaitOne(2000), "Exception listener should have fired with a MessageListener");
+        //
+        //         // Try closing it explicitly, should effectively no-op in client.
+        //         // The test peer will throw during close if it sends anything.
+        //         consumer.Close();
+        //     }
+        // }
+
+        [Test, Timeout(20_000)]
+        public void TestReceiveMessageWithReceiveZeroTimeout()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1);
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                var consumer = context.CreateConsumer(queue);
+                IMessage message = consumer.Receive();
+                Assert.NotNull(message, "A message should have been received");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(10000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestExceptionInOnMessageReleasesInAutoAckMode()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1);
+                testPeer.ExpectDispositionThatIsReleasedAndSettled();
+
+                var consumer = context.CreateConsumer(queue);
+                consumer.Listener += message => throw new Exception();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(10000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCloseDurableTopicSubscriberDetachesWithCloseFalse()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                
+                string topicName = "myTopic";
+                string subscriptionName = "mySubscription";
+                ITopic topic = context.GetTopic(topicName);
+
+                testPeer.ExpectDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+
+                var durableConsumer = context.CreateDurableConsumer(topic, subscriptionName, null, false);
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, replyClosed: false);
+                durableConsumer.Close();
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerReceiveThrowsIfConnectionLost()
+        {
+            DoTestConsumerReceiveThrowsIfConnectionLost(false);
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerTimedReceiveThrowsIfConnectionLost()
+        {
+            DoTestConsumerReceiveThrowsIfConnectionLost(true);
+        }
+
+        private void DoTestConsumerReceiveThrowsIfConnectionLost(bool useTimeout)
+        {
+            ManualResetEvent consumerReady = new ManualResetEvent(false);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("queue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.RunAfterLastHandler(() => { consumerReady.WaitOne(2000); });
+                testPeer.DropAfterLastMatcher(delay: 10);
+
+                var consumer = context.CreateConsumer(queue);
+                consumerReady.Set();
+
+                try
+                {
+                    if (useTimeout)
+                    {
+                        consumer.Receive(TimeSpan.FromMilliseconds(10_0000));
+                    }
+                    else
+                    {
+                        consumer.Receive();
+                    }
+
+
+                    Assert.Fail("An exception should have been thrown");
+                }
+                catch (NMSException)
+                {
+                    // Expected
+                }
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        //  TODO No connection Listener in context
+        // [Test, Timeout(20_000)]
+        // public void TestConsumerReceiveNoWaitThrowsIfConnectionLost()
+        // {
+        //     ManualResetEvent disconnected = new ManualResetEvent(false);
+        //
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         NmsContext context = (NmsContext) EstablishNMSContext(testPeer);
+        //         Mock<INmsConnectionListener> connectionListener = new Mock<INmsConnectionListener>();
+        //
+        //         connectionListener
+        //             .Setup(listener => listener.OnConnectionFailure(It.IsAny<NMSException>()))
+        //             .Callback(() => { disconnected.Set(); });
+        //
+        //         context.AddConnectionListener(connectionListener.Object);
+        //
+        //         context.Start();
+        //
+        //         testPeer.ExpectBegin();
+        //
+        //         IQueue queue = context.GetQueue("queue");
+        //
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlow();
+        //         testPeer.RemotelyCloseConnection(expectCloseResponse: true, errorCondition: ConnectionError.CONNECTION_FORCED, errorMessage: "buba");
+        //
+        //         var consumer = context.CreateConsumer(queue);
+        //
+        //         Assert.True(disconnected.WaitOne(), "Connection should be disconnected");
+        //
+        //         try
+        //         {
+        //             consumer.ReceiveNoWait();
+        //             Assert.Fail("An exception should have been thrown");
+        //         }
+        //         catch (NMSException)
+        //         {
+        //             // Expected
+        //         }
+        //     }
+        // }
+
+        [Test, Timeout(20_000)]
+        public void TestSetMessageListenerAfterStartAndSend()
+        {
+            int messageCount = 4;
+            CountdownEvent latch = new CountdownEvent(messageCount);
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), messageCount);
+
+                var consumer = context.CreateConsumer(destination);
+
+                for (int i = 0; i < messageCount; i++)
+                {
+                    testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+                }
+
+                consumer.Listener += message => latch.Signal();
+
+                Assert.True(latch.Wait(4000), "Messages not received within given timeout. Count remaining: " + latch.CurrentCount);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        // TODO Connection is started anyway when creating consumer
+        // [Test, Timeout(20_000)]
+        // public void TestNoReceivedMessagesWhenConnectionNotStarted()
+        // {
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         var context = EstablishNMSContext(testPeer);
+        //         
+        //         testPeer.ExpectBegin();
+        //
+        //         IQueue destination = context.GetQueue("myQueue");
+        //
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 3);
+        //
+        //         // cREATING CONSUMER STARTS CONNECTION
+        //         var consumer = context.CreateConsumer(destination);
+        //
+        //         // Wait for a message to arrive then try and receive it, which should not happen
+        //         // since the connection is not started.
+        //         Assert.Null(consumer.Receive(TimeSpan.FromMilliseconds(100)));
+        //
+        //         testPeer.ExpectEnd();
+        //         testPeer.ExpectClose();
+        //         context.Close();
+        //
+        //         testPeer.WaitForAllMatchersToComplete(2000);
+        //     }
+        // }
+
+        // TODO Connection is started anyway when creating consumer
+        // [Test, Timeout(20_000)]
+        // public void TestNoReceivedNoWaitMessagesWhenConnectionNotStarted()
+        // {
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         var context = EstablishNMSContext(testPeer);
+        //
+        //         testPeer.ExpectBegin();
+        //
+        //         IQueue destination = context.GetQueue("myQueue");
+        //
+        //         testPeer.ExpectReceiverAttach();
+        //         testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 3);
+        //
+        //         var consumer = context.CreateConsumer(destination);
+        //
+        //         // Wait for a message to arrive then try and receive it, which should not happen
+        //         // since the connection is not started.
+        //         Assert.Null(consumer.ReceiveNoWait());
+        //
+        //         testPeer.ExpectEnd();
+        //         testPeer.ExpectClose();
+        //         context.Close();
+        //
+        //         testPeer.WaitForAllMatchersToComplete(2000);
+        //     }
+        // }
+
+        [Test, Timeout(20_000)]
+        public void TestSyncReceiveFailsWhenListenerSet()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+
+                var consumer = context.CreateConsumer(destination);
+
+                consumer.Listener += message => { };
+
+                Assert.Catch<NMSException>(() => consumer.Receive(), "Should have thrown an exception.");
+                Assert.Catch<NMSException>(() => consumer.Receive(TimeSpan.FromMilliseconds(1000)), "Should have thrown an exception.");
+                Assert.Catch<NMSException>(() => consumer.ReceiveNoWait(), "Should have thrown an exception.");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateProducerInOnMessage()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                IQueue outbound = context.GetQueue("ForwardDest");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                testPeer.ExpectSenderAttach();
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull);
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                var consumer = context.CreateConsumer(destination);
+
+                consumer.Listener += message =>
+                {
+                    var producer = context.CreateProducer();
+                    producer.Send(outbound, message);
+                    producer.Close();
+                };
+
+                testPeer.WaitForAllMatchersToComplete(10_000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageListenerCallsConnectionCloseThrowsIllegalStateException()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                ManualResetEvent latch = new ManualResetEvent(false);
+                Exception exception = null;
+                consumer.Listener += message =>
+                {
+                    try
+                    {
+                        context.Close();
+                    }
+                    catch (Exception e)
+                    {
+                        exception = e;
+                    }
+
+                    latch.Set();
+                };
+
+                Assert.True(latch.WaitOne(4000), "Messages not received within given timeout.");
+                Assert.IsNotNull(exception);
+                Assert.IsInstanceOf<IllegalStateException>(exception);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                // testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageListenerCallsConnectionStopThrowsIllegalStateException()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                ManualResetEvent latch = new ManualResetEvent(false);
+                Exception exception = null;
+                consumer.Listener += message =>
+                {
+                    try
+                    {
+                        context.Stop();
+                    }
+                    catch (Exception e)
+                    {
+                        exception = e;
+                    }
+
+                    latch.Set();
+                };
+
+                Assert.True(latch.WaitOne(3000), "Messages not received within given timeout.");
+                Assert.IsNotNull(exception);
+                Assert.IsInstanceOf<IllegalStateException>(exception);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageListenerCallsSessionCloseThrowsIllegalStateException()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                ManualResetEvent latch = new ManualResetEvent(false);
+                Exception exception = null;
+                consumer.Listener += message =>
+                {
+                    try
+                    {
+                        context.Close();
+                    }
+                    catch (Exception e)
+                    {
+                        exception = e;
+                    }
+
+                    latch.Set();
+                };
+
+                Assert.True(latch.WaitOne(3000), "Messages not received within given timeout.");
+                Assert.IsNotNull(exception);
+                Assert.IsInstanceOf<IllegalStateException>(exception);
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectEnd();
+                // testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        // TODO: To be fixed
+        [Test, Timeout(20_000), Ignore("Ignore")]
+        public void TestMessageListenerClosesItsConsumer()
+        {
+            var latch = new ManualResetEvent(false);
+            var exceptionListenerFired = new ManualResetEvent(false);
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                context.ExceptionListener += _ => exceptionListenerFired.Set();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectLinkFlow(drain: true, sendDrainFlowResponse: true, creditMatcher: credit => Assert.AreEqual(99, credit)); // Not sure if expected credit is right
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                Exception exception = null;
+                consumer.Listener += message =>
+                {
+                    try
+                    {
+                        consumer.Close();
+                        latch.Set();
+                    }
+                    catch (Exception e)
+                    {
+                        exception = e;
+                    }
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(1000)), "Process not completed within given timeout");
+                Assert.IsNull(exception, "No error expected during close");
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                Assert.False(exceptionListenerFired.WaitOne(20), "Exception listener shouldn't have fired");
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestRecoverOrderingWithAsyncConsumer()
+        {
+            ManualResetEvent latch = new ManualResetEvent(false);
+            Exception asyncError = null;
+
+            int recoverCount = 5;
+            int messageCount = 8;
+            int testPayloadLength = 255;
+            string payload = Encoding.UTF8.GetString(new byte[testPayloadLength]);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer, acknowledgementMode:AcknowledgementMode.ClientAcknowledge);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: new Amqp.Message() { BodySection = new AmqpValue() { Value = payload } },
+                    count: messageCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, messageCount)
+                );
+
+                var consumer = context.CreateConsumer(destination);
+                
+                bool complete = false;
+                int messageSeen = 0;
+                int expectedIndex = 0;
+                consumer.Listener += message =>
+                {
+                    if (complete)
+                    {
+                        return;
+                    }
+
+                    try
+                    {
+                        int actualIndex = message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER);
+                        Assert.AreEqual(expectedIndex, actualIndex, "Received Message Out Of Order");
+
+                        // don't ack the message until we receive it X times
+                        if (messageSeen < recoverCount)
+                        {
+                            context.Recover();
+                            messageSeen++;
+                        }
+                        else
+                        {
+                            messageSeen = 0;
+                            expectedIndex++;
+
+                            // Have the peer expect the accept the disposition (1-based, hence pre-incremented).
+                            testPeer.ExpectDisposition(settled: true,
+                                stateMatcher: state => Assert.AreEqual(state.Descriptor.Code, MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code
+                                ));
+
+                            message.Acknowledge();
+
+                            if (expectedIndex == messageCount)
+                            {
+                                complete = true;
+                                latch.Set();
+                            }
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        complete = true;
+                        asyncError = e;
+                        latch.Set();
+                    }
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromSeconds(15)), "Messages not received within given timeout.");
+                Assert.IsNull(asyncError, "Unexpected exception");
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCloseWaitsForAsyncDeliveryToComplete()
+        {
+            ManualResetEvent latch = new ManualResetEvent(false);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                consumer.Listener += _ =>
+                {
+                    latch.Set();
+                    Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSessionCloseWaitsForAsyncDeliveryToComplete()
+        {
+            ManualResetEvent latch = new ManualResetEvent(false);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                consumer.Listener += _ =>
+                {
+                    latch.Set();
+                    Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+                
+                testPeer.WaitForAllMatchersToComplete(2000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConnectionCloseWaitsForAsyncDeliveryToComplete()
+        {
+            ManualResetEvent latch = new ManualResetEvent(false);
+
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                IQueue destination = context.GetQueue("myQueue");
+                context.Start();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1);
+
+                var consumer = context.CreateConsumer(destination);
+
+                testPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                consumer.Listener += _ =>
+                {
+                    latch.Set();
+                    Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                };
+
+                Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestRecoveredMessageShouldNotBeMutated()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer, acknowledgementMode:AcknowledgementMode.ClientAcknowledge);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                IQueue destination = context.GetQueue("myQueue");
+                string originalPayload = "testMessage";
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue() { Value = originalPayload } }, count: 1);
+
+                var consumer = context.CreateConsumer(destination);
+                NmsTextMessage message = consumer.Receive() as NmsTextMessage;
+                Assert.NotNull(message);
+                message.IsReadOnlyBody = false;
+                message.Text = message.Text + "Received";
+                context.Recover();
+
+                ITextMessage recoveredMessage = consumer.Receive() as ITextMessage;
+                Assert.IsNotNull(recoveredMessage);
+                Assert.AreNotEqual(message.Text, recoveredMessage.Text);
+                Assert.AreEqual(originalPayload, recoveredMessage.Text);
+                Assert.AreNotSame(message, recoveredMessage);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs
new file mode 100644
index 0000000..a1b43e8
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSContextIntegrationTest.cs
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    // Adapted from SessionIntegrationTest to use NMSContext
+    [TestFixture]
+    public class NMSContextIntegrationTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public void TestClose()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectClose();
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateProducer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+
+                testPeer.ExpectSenderAttach();
+
+                var producer = context.CreateProducer();
+
+                testPeer.ExpectDetach(true, true, true);
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                producer.Close();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                var consumer = context.CreateConsumer(context.GetQueue("myQueue"));
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateConsumerWithEmptySelector()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                IQueue queue = context.GetQueue("myQueue");
+                context.CreateConsumer(queue, "");
+                context.CreateConsumer(queue, "", noLocal: false);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateConsumerWithNullSelector()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow();
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                IQueue queue = context.GetQueue("myQueue");
+                context.CreateConsumer(queue, null);
+                context.CreateConsumer(queue, null, noLocal: false);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateDurableConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                string topicName = "myTopic";
+                ITopic topic = context.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+
+                var durableConsumer = context.CreateDurableConsumer(topic, subscriptionName, null, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+
+        [Test, Timeout(20_000)]
+        public void TestCreateTemporaryQueue()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+
+                string dynamicAddress = "myTempQueueAddress";
+                testPeer.ExpectTempQueueCreationAttach(dynamicAddress);
+
+                ITemporaryQueue temporaryQueue = context.CreateTemporaryQueue();
+                Assert.NotNull(temporaryQueue, "TemporaryQueue object was null");
+                Assert.NotNull(temporaryQueue.QueueName, "TemporaryQueue queue name was null");
+                Assert.AreEqual(dynamicAddress, temporaryQueue.QueueName, "TemporaryQueue name not as expected");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateTemporaryTopic()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+
+                string dynamicAddress = "myTempTopicAddress";
+                testPeer.ExpectTempTopicCreationAttach(dynamicAddress);
+
+                ITemporaryTopic temporaryTopic = context.CreateTemporaryTopic();
+                Assert.NotNull(temporaryTopic, "TemporaryTopic object was null");
+                Assert.NotNull(temporaryTopic.TopicName, "TemporaryTopic name was null");
+                Assert.AreEqual(dynamicAddress, temporaryTopic.TopicName, "TemporaryTopic name not as expected");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateSharedConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                string topicName = "myTopic";
+                ITopic topic = context.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectSharedSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+
+                var durableConsumer = context.CreateSharedConsumer(topic, subscriptionName, null); //, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(20000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestCreateSharedDurableConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+
+                string topicName = "myTopic";
+                ITopic topic = context.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectSharedDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+
+                var durableConsumer = context.CreateSharedDurableConsumer(topic, subscriptionName, null); //, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs
new file mode 100644
index 0000000..22563a0
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSProducerIntegrationTest.cs
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp.Framing;
+using Amqp.Types;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    // Adapted from ProducerIntegrationTest to use NMSContext
+    [TestFixture]
+    public class NMSProducerIntegrationTest : IntegrationTestFixture
+    {
+        private const long TICKS_PER_MILLISECOND = 10000;
+
+        [Test, Timeout(20_000)]
+        public void TestCloseSender()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = base.EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                producer.Close();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSentTextMessageCanBeModified()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = base.EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                // Create and transfer a new message
+                String text = "myMessage";
+                testPeer.ExpectTransfer(x => Assert.AreEqual(text, (x.BodySection as AmqpValue).Value));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage(text);
+                producer.Send(queue, message);
+
+                Assert.AreEqual(text, message.Text);
+                message.Text = text + text;
+                Assert.AreEqual(text + text, message.Text);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestDefaultDeliveryModeProducesDurableMessages()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = base.EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                // Create and transfer a new message
+                testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage textMessage = context.CreateTextMessage();
+
+                producer.Send(queue, textMessage);
+                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestProducerOverridesMessageDeliveryMode()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = base.EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue queue = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                // Create and transfer a new message, explicitly setting the deliveryMode on the
+                // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+                // that the producer ignores this value and sends the message as PERSISTENT(/durable)
+                testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage textMessage = context.CreateTextMessage();
+                textMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
+                Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode);
+
+                producer.Send(queue, textMessage);
+
+                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+                context.Close();
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageNonPersistentProducerSetDurableFalse()
+        {
+            DoSendingMessageNonPersistentTestImpl(true);
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageNonPersistentProducerOmitsHeader()
+        {
+            DoSendingMessageNonPersistentTestImpl(false);
+        }
+
+        private void DoSendingMessageNonPersistentTestImpl(bool setPriority)
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                //Add capability to indicate support for ANONYMOUS-RELAY
+                Symbol[] serverCapabilities = {SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY};
+                var context = EstablishNMSContext(testPeer, serverCapabilities: serverCapabilities);
+                testPeer.ExpectBegin();
+
+                string queueName = "myQueue";
+                Action<object> targetMatcher = t =>
+                {
+                    var target = t as Target;
+                    Assert.IsNotNull(target);
+                };
+
+
+                testPeer.ExpectSenderAttach(targetMatcher: targetMatcher, sourceMatcher: Assert.NotNull, senderSettled: false);
+
+                IQueue queue = context.GetQueue(queueName);
+                INMSProducer producer = context.CreateProducer();
+
+                byte priority = 5;
+                String text = "myMessage";
+                testPeer.ExpectTransfer(messageMatcher: message =>
+                    {
+                        if (setPriority)
+                        {
+                            Assert.IsFalse(message.Header.Durable);
+                            Assert.AreEqual(priority, message.Header.Priority);
+                        }
+
+                        Assert.AreEqual(text, (message.BodySection as AmqpValue).Value);
+                    }, stateMatcher: Assert.IsNull,
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true);
+
+                ITextMessage textMessage = context.CreateTextMessage(text);
+
+                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+                if (setPriority)
+                    producer.Priority = (MsgPriority) priority;
+
+                producer.Send(queue, textMessage);
+
+                Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode, "Should have NonPersistent delivery mode set");
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageSetsNMSDestination()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                string text = "myMessage";
+                ITextMessage message = context.CreateTextMessage(text);
+
+                testPeer.ExpectTransfer(m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                Assert.IsNull(message.NMSDestination, "Should not yet have a NMSDestination");
+
+                producer.Send(destination, message);
+
+                Assert.AreEqual(destination, message.NMSDestination, "Should have had NMSDestination set");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageSetsNMSTimestamp()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                // Create matcher to expect the absolute-expiry-time field of the properties section to
+                // be set to a value greater than 'now'+ttl, within a delta.
+
+                DateTime creationLower = DateTime.UtcNow;
+                DateTime creationUpper = creationLower + TimeSpan.FromMilliseconds(3000);
+
+                var text = "myMessage";
+                testPeer.ExpectTransfer(m =>
+                {
+                    Assert.IsTrue(m.Header.Durable);
+                    Assert.That(m.Properties.CreationTime.Ticks, Is.GreaterThanOrEqualTo(creationLower.Ticks).Within(TICKS_PER_MILLISECOND));
+                    Assert.That(m.Properties.CreationTime.Ticks, Is.LessThanOrEqualTo(creationUpper.Ticks).Within(TICKS_PER_MILLISECOND));
+                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+                });
+
+                ITextMessage message = context.CreateTextMessage(text);
+                producer.Send(destination, message);
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageSetsNMSExpirationRelatedAbsoluteExpiryAndTtlFields()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                uint ttl = 100_000;
+                DateTime currentTime = DateTime.UtcNow;
+                DateTime expirationLower = currentTime + TimeSpan.FromMilliseconds(ttl);
+                DateTime expirationUpper = currentTime + TimeSpan.FromMilliseconds(ttl) + TimeSpan.FromMilliseconds(5000);
+
+                // Create matcher to expect the absolute-expiry-time field of the properties section to
+                // be set to a value greater than 'now'+ttl, within a delta.
+                string text = "myMessage";
+                testPeer.ExpectTransfer(m =>
+                {
+                    Assert.IsTrue(m.Header.Durable);
+                    Assert.AreEqual(ttl, m.Header.Ttl);
+                    Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.GreaterThanOrEqualTo(expirationLower.Ticks).Within(TICKS_PER_MILLISECOND));
+                    Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.LessThanOrEqualTo(expirationUpper.Ticks).Within(TICKS_PER_MILLISECOND));
+                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+                });
+
+                ITextMessage message = context.CreateTextMessage(text);
+                producer.TimeToLive = TimeSpan.FromMilliseconds(ttl);
+                producer.Priority = NMSConstants.defaultPriority;
+                producer.DeliveryMode = NMSConstants.defaultDeliveryMode;
+                producer.Send(destination, message);
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessagesAreProducedWithProperDefaultPriorityWhenNoPrioritySpecified()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                byte priority = 4;
+
+                testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage();
+                Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority);
+
+                producer.Send(destination, message);
+
+                Assert.AreEqual((MsgPriority) priority, message.NMSPriority);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestNonDefaultPriorityProducesMessagesWithPriorityFieldAndSetsNMSPriority()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                byte priority = 9;
+
+                testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority));
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage();
+                Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority);
+
+                producer.DeliveryMode = MsgDeliveryMode.Persistent;
+                producer.Priority = (MsgPriority) priority;
+                producer.TimeToLive = NMSConstants.defaultTimeToLive;
+                producer.Send(destination, message);
+
+                Assert.AreEqual((MsgPriority) priority, message.NMSPriority);
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageSetsNMSMessageId()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                string text = "myMessage";
+                string actualMessageId = null;
+                testPeer.ExpectTransfer(m =>
+                {
+                    Assert.IsTrue(m.Header.Durable);
+                    Assert.IsNotEmpty(m.Properties.MessageId);
+                    actualMessageId = m.Properties.MessageId;
+                });
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage(text);
+                Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
+
+                producer.Send(destination, message);
+
+                Assert.IsNotNull(message.NMSMessageId);
+                Assert.IsNotEmpty(message.NMSMessageId, "NMSMessageId should be set");
+                Assert.IsTrue(message.NMSMessageId.StartsWith("ID:"), "MMS 'ID:' prefix not found");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+                // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
+                Assert.AreEqual(message.NMSMessageId, actualMessageId, "Expected NMSMessageId value to be present in AMQP message");
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageWithDisableMessageIdHint()
+        {
+            DoSendingMessageWithDisableMessageIdHintTestImpl(false);
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageWithDisableMessageIdHintAndExistingMessageId()
+        {
+            DoSendingMessageWithDisableMessageIdHintTestImpl(true);
+        }
+
+        private void DoSendingMessageWithDisableMessageIdHintTestImpl(bool existingId)
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                string text = "myMessage";
+                testPeer.ExpectTransfer(m =>
+                {
+                    Assert.IsTrue(m.Header.Durable);
+                    Assert.IsNull(m.Properties.MessageId); // Check there is no message-id value;
+                    Assert.AreEqual(text, (m.BodySection as AmqpValue).Value);
+                });
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                ITextMessage message = context.CreateTextMessage(text);
+
+                Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set");
+
+                if (existingId)
+                {
+                    string existingMessageId = "ID:this-should-be-overwritten-in-send";
+                    message.NMSMessageId = existingMessageId;
+                    Assert.AreEqual(existingMessageId, message.NMSMessageId, "NMSMessageId should now be se");
+                }
+
+                producer.DisableMessageID = true;
+
+                producer.Send(destination, message);
+
+                Assert.IsNull(message.NMSMessageId, "NMSMessageID should be null");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(2000);
+            }
+        }
+
+        // TODO No connection listener in nms context
+        // [Test, Timeout(20_000)]
+        // public void TestRemotelyCloseProducer()
+        // {
+        //     string breadCrumb = "ErrorMessageBreadCrumb";
+        //
+        //     ManualResetEvent producerClosed = new ManualResetEvent(false);
+        //     Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
+        //     mockConnectionListener
+        //         .Setup(listener => listener.OnProducerClosed(It.IsAny<NmsMessageProducer>(), It.IsAny<Exception>()))
+        //         .Callback(() => { producerClosed.Set(); });
+        //
+        //     using (TestAmqpPeer testPeer = new TestAmqpPeer())
+        //     {
+        //         NmsContext context = (NmsContext) EstablishNMSContext(testPeer);
+        //         context.AddConnectionListener(mockConnectionListener.Object);
+        //
+        //         testPeer.ExpectBegin();
+        //         ISession session = context.CreateSession(AcknowledgementMode.AutoAcknowledge);
+        //
+        //         // Create a producer, then remotely end it afterwards.
+        //         testPeer.ExpectSenderAttach();
+        //         testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, breadCrumb, delayBeforeSend: 10);
+        //
+        //         IQueue destination = session.GetQueue("myQueue");
+        //         IMessageProducer producer = session.CreateProducer(destination);
+        //
+        //         // Verify the producer gets marked closed
+        //         testPeer.WaitForAllMatchersToComplete(1000);
+        //
+        //         Assert.True(producerClosed.WaitOne(TimeSpan.FromMilliseconds(1000)), "Producer closed callback didn't trigger");
+        //         Assert.That(() => producer.DisableMessageID, Throws.Exception.InstanceOf<IllegalStateException>(), "Producer never closed");
+        //
+        //         // Try closing it explicitly, should effectively no-op in client.
+        //         // The test peer will throw during close if it sends anything.
+        //         producer.Close();
+        //     }
+        // }
+
+        [Test, Timeout(20_000)]
+        public void TestSendWhenLinkCreditIsZeroAndTimeout()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer, optionsString: "nms.sendTimeout=500");
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("myQueue");
+
+                ITextMessage message = context.CreateTextMessage("text");
+
+                // Expect the producer to attach. Don't send any credit so that the client will
+                // block on a send and we can test our timeouts.
+                testPeer.ExpectSenderAttachWithoutGrantingCredit();
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                var producer = context.CreateProducer();
+
+                Assert.Catch<Exception>(() => producer.Send(queue, message), "Send should time out.");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendTimesOutWhenNoDispositionArrives()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer, optionsString: "nms.sendTimeout=500");
+                testPeer.ExpectBegin();
+
+                IQueue queue = context.GetQueue("myQueue");
+
+                ITextMessage message = context.CreateTextMessage("text");
+
+                // Expect the producer to attach and grant it some credit, it should send
+                // a transfer which we will not send any response for which should cause the
+                // send operation to time out.
+                testPeer.ExpectSenderAttach();
+                testPeer.ExpectTransferButDoNotRespond(messageMatcher: Assert.NotNull);
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                var producer = context.CreateProducer();
+
+                Assert.Catch<Exception>(() => producer.Send(queue, message), "Send should time out.");
+
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendWorksWhenConnectionNotStarted()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                testPeer.ExpectTransfer(Assert.IsNotNull);
+
+                producer.Send(destination, context.CreateMessage());
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                producer.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendWorksAfterConnectionStopped()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+
+                testPeer.ExpectTransfer(Assert.IsNotNull);
+
+                context.Stop();
+
+                producer.Send(destination, context.CreateMessage());
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+
+                producer.Close();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessagePersistentSetsBatchableFalse()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+                testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+                    stateMatcher: Assert.IsNull,
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    batchable: false);
+
+                IMessage message = context.CreateMessage();
+                producer.DeliveryMode = MsgDeliveryMode.Persistent;
+                producer.Priority = MsgPriority.Normal;
+                producer.TimeToLive = NMSConstants.defaultTimeToLive;
+                producer.Send(destination, message);
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestSendingMessageNonPersistentSetsBatchableFalse()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var context = EstablishNMSContext(testPeer);
+                context.Start();
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                IQueue destination = context.GetQueue("myQueue");
+                var producer = context.CreateProducer();
+                testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull,
+                    stateMatcher: Assert.IsNull,
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    batchable: false);
+
+                IMessage message = context.CreateMessage();
+                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+                producer.Priority = MsgPriority.Normal;
+                producer.TimeToLive = NMSConstants.defaultTimeToLive;
+                producer.Send(destination, message);
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs
new file mode 100644
index 0000000..88d05df
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/ProducerIntegrationAsyncTest.cs
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading.Tasks;
+using Amqp.Framing;
+using Apache.NMS;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    [TestFixture]
+    public class ProducerIntegrationAsyncTest : IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public void TestSentAsyncIsAsynchronous()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = base.EstablishConnection(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Create and transfer a new message
+                String text = "myMessage";
+                testPeer.ExpectTransfer(messageMatcher: m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value),
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    stateMatcher: Assert.IsNull,
+                    dispositionDelay: 10); // 10ms should be enough
+                testPeer.ExpectClose();
+
+                ITextMessage message = session.CreateTextMessage(text);
+                var sendTask = producer.SendAsync(message);
+                // Instantly check if its not completed yet, we want async, so it should not be completed right after 
+                Assert.AreEqual(false, sendTask.IsCompleted);
+                
+                // And now wait for task to complete
+                sendTask.Wait(20_000);
+
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public async Task TestSentAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = base.EstablishConnection(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Create and transfer a new message
+                String text = "myMessage";
+                testPeer.ExpectTransfer(messageMatcher: m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value),
+                    settled: false,
+                    sendResponseDisposition: true,
+                    responseState: new Accepted(),
+                    responseSettled: true,
+                    stateMatcher: Assert.IsNull,
+                    dispositionDelay: 10); // 10ms should be enough
+                testPeer.ExpectClose();
+
+                ITextMessage message = session.CreateTextMessage(text);
+                await producer.SendAsync(message);
+              
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+       
+
+        [Test, Timeout(20_000)]
+        public async Task TestProducerWorkWithAsyncAwait()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = base.EstablishConnection(testPeer);
+                testPeer.ExpectBegin();
+                testPeer.ExpectSenderAttach();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageProducer producer = session.CreateProducer(queue);
+
+                // Create and transfer a new message, explicitly setting the deliveryMode on the
+                // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+                // that the producer ignores this value and sends the message as PERSISTENT(/durable)
+                testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable));
+                testPeer.ExpectClose();
+
+                ITextMessage textMessage = session.CreateTextMessage();
+                textMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
+                Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode);
+
+                await producer.SendAsync(textMessage);
+
+                Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+                connection.Close();
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
index 4fbdc29..da336e8 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/SessionIntegrationTest.cs
@@ -227,5 +227,62 @@
                 testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
+       
+        [Test, Timeout(20_000)]
+        public void TestCreateSharedConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                
+                string topicName = "myTopic";
+                ITopic topic = session.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectSharedSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+                
+                IMessageConsumer durableConsumer = session.CreateSharedConsumer(topic, subscriptionName, null);//, false);
+                // IMessageConsumer durableConsumer = session.CreateDurableConsumer(topic, subscriptionName, null, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(20000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestCreateSharedDurableConsumer()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                
+                string topicName = "myTopic";
+                ITopic topic = session.GetTopic(topicName);
+                string subscriptionName = "mySubscription";
+
+                testPeer.ExpectSharedDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.ExpectLinkFlow();
+                
+                IMessageConsumer durableConsumer = session.CreateSharedDurableConsumer(topic, subscriptionName, null); //, false);
+                Assert.NotNull(durableConsumer, "MessageConsumer object was null");
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
index 69ff292..89b4874 100644
--- a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
@@ -60,6 +60,7 @@
         public IDestination NMSReplyTo { get; set; }
         public DateTime NMSTimestamp { get; set; }
         public string NMSType { get; set; }
+        public DateTime DeliveryTime { get; set; }
         public string GroupId { get; set; }
         public uint GroupSequence { get; set; }
         public DateTime? Expiration { get; set; }
@@ -71,5 +72,10 @@
         {
             return null;
         }
+
+        public bool HasBody()
+        {
+            return true;
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestObjectMessageFacade.cs b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestObjectMessageFacade.cs
index 2ef3bfb..29a6f18 100644
--- a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestObjectMessageFacade.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestObjectMessageFacade.cs
@@ -21,11 +21,11 @@
 {
     public class NmsTestObjectMessageFacade : NmsTestMessageFacade, INmsObjectMessageFacade
     {
-        public object Body { get; set; }
+        public object Object { get; set; }
 
         public override void ClearBody()
         {
-            Body = null;
+            Object = null;
         }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Message/Foreign/ForeignNmsMessage.cs b/test/Apache-NMS-AMQP-Test/Message/Foreign/ForeignNmsMessage.cs
index c31eb4d..e9f0c38 100644
--- a/test/Apache-NMS-AMQP-Test/Message/Foreign/ForeignNmsMessage.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/Foreign/ForeignNmsMessage.cs
@@ -41,6 +41,16 @@
             message.ClearProperties();
         }
 
+        public T Body<T>()
+        {
+            return message.Body<T>();
+        }
+
+        public bool IsBodyAssignableTo(Type type)
+        {
+            return message.IsBodyAssignableTo(type);
+        }
+
         public IPrimitiveMap Properties => message.Properties;
 
         public string NMSCorrelationID
@@ -102,5 +112,11 @@
             get => message.NMSType;
             set => message.NMSType = value;
         }
+
+        public DateTime NMSDeliveryTime
+        {
+            get => message.NMSDeliveryTime;
+            set => message.NMSDeliveryTime = value;
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Message/NmsObjectMessageTest.cs b/test/Apache-NMS-AMQP-Test/Message/NmsObjectMessageTest.cs
index b2b5f65..fdd32a9 100644
--- a/test/Apache-NMS-AMQP-Test/Message/NmsObjectMessageTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/NmsObjectMessageTest.cs
@@ -39,7 +39,7 @@
         {
             string content = "myStringContent";
             NmsTestObjectMessageFacade facade = new NmsTestObjectMessageFacade();
-            facade.Body = content;
+            facade.Object = content;
             NmsObjectMessage objectMessage = new NmsObjectMessage(facade);
             objectMessage.OnDispatch();
 
@@ -51,7 +51,7 @@
         {
             string content = "myStringContent";
             NmsTestObjectMessageFacade facade = new NmsTestObjectMessageFacade();
-            facade.Body = content;
+            facade.Object = content;
             NmsObjectMessage objectMessage = new NmsObjectMessage(facade);
             objectMessage.OnDispatch();
 
@@ -65,14 +65,14 @@
         {
             string content = "myStringContent";
             NmsTestObjectMessageFacade facade = new NmsTestObjectMessageFacade();
-            facade.Body = content;
+            facade.Object = content;
             NmsObjectMessage objectMessage = new NmsObjectMessage(facade);
             objectMessage.OnDispatch();
 
             Assert.NotNull(objectMessage.Body);
             objectMessage.ClearBody();
 
-            Assert.Null(facade.Body);
+            Assert.Null(facade.Object);
         }
 
         [Test]
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpMessageFactoryTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpMessageFactoryTest.cs
index 562048f..8ac51f4 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpMessageFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpMessageFactoryTest.cs
@@ -123,7 +123,7 @@
             Assert.IsInstanceOf<AmqpNmsObjectMessageFacade>(facade);
             Assert.IsNull(facade.JmsMsgType);
 
-            Assert.IsNull(((AmqpNmsObjectMessageFacade) facade).Body);
+            Assert.IsNull(((AmqpNmsObjectMessageFacade) facade).Object);
         }
 
 
@@ -141,8 +141,8 @@
 
             AmqpNmsObjectMessageFacade objectMessageFacade = (AmqpNmsObjectMessageFacade) facade;
 
-            Assert.IsNotNull(objectMessageFacade.Body);
-            Assert.IsInstanceOf<SerializableClass>(objectMessageFacade.Body);
+            Assert.IsNotNull(objectMessageFacade.Object);
+            Assert.IsInstanceOf<SerializableClass>(objectMessageFacade.Object);
         }
 
         [Test]
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsObjectMessageFacadeTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsObjectMessageFacadeTest.cs
index 29a077f..2a9b224 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsObjectMessageFacadeTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsObjectMessageFacadeTest.cs
@@ -57,7 +57,7 @@
         private void DoNewMessageToSendReturnsNullObjectTestImpl(bool amqpTyped)
         {
             AmqpNmsObjectMessageFacade amqpObjectMessageFacade = CreateNewObjectMessageFacade(amqpTyped);
-            Assert.Null(amqpObjectMessageFacade.Body);
+            Assert.Null(amqpObjectMessageFacade.Object);
         }
 
         [Test]
@@ -96,7 +96,7 @@
             String content = "myStringContent";
 
             AmqpNmsObjectMessageFacade amqpObjectMessageFacade = CreateNewObjectMessageFacade();
-            amqpObjectMessageFacade.Body = content;
+            amqpObjectMessageFacade.Object = content;
 
             var bytes = GetSerializedBytes(content);
 
@@ -118,7 +118,7 @@
             String content = "myStringContent";
 
             AmqpNmsObjectMessageFacade amqpObjectMessageFacade = CreateNewObjectMessageFacade(true);
-            amqpObjectMessageFacade.Body = content;
+            amqpObjectMessageFacade.Object = content;
 
             // retrieve the body from the underlying message, check it matches expectation
             RestrictedDescribed section = amqpObjectMessageFacade.Message.BodySection;
@@ -149,9 +149,9 @@
             AmqpNmsObjectMessageFacade facade = CreateReceivedObjectMessageFacade(message);
 
             Assert.NotNull(facade.Message.BodySection, "Expected existing body section to be found");
-            facade.Body = null;
+            facade.Object = null;
             Assert.AreSame(AmqpSerializedObjectDelegate.NULL_OBJECT_BODY, facade.Message.BodySection, "Expected existing body section to be replaced");
-            Assert.Null(facade.Body);
+            Assert.Null(facade.Object);
         }
 
         /*
@@ -179,7 +179,7 @@
             facade.ClearBody();
 
             Assert.AreSame(AmqpSerializedObjectDelegate.NULL_OBJECT_BODY, facade.Message.BodySection, "Expected existing body section to be replaced");
-            Assert.Null(facade.Body);
+            Assert.Null(facade.Object);
         }
 
         /*
@@ -195,12 +195,12 @@
             };
 
             AmqpNmsObjectMessageFacade facade = CreateNewObjectMessageFacade(false);
-            facade.Body = origMap;
+            facade.Object = origMap;
 
             Dictionary<string, string> d = new Dictionary<string, string>();
 
             // verify we get a different-but-equal object back
-            object body = facade.Body;
+            object body = facade.Object;
             Assert.IsInstanceOf<Dictionary<string, string>>(body);
             Dictionary<string, string> returnedObject1 = (Dictionary<string, string>) body;
             Assert.AreNotSame(origMap, returnedObject1, "Expected different objects, due to snapshot being taken");
@@ -210,7 +210,7 @@
             origMap.Add("key2", "value2");
 
             // verify we get a different-but-equal object back when compared to the previously retrieved object
-            object body2 = facade.Body;
+            object body2 = facade.Object;
             Assert.IsInstanceOf<Dictionary<string, string>>(body2);
             Dictionary<string, string> returnedObject2 = (Dictionary<string, string>) body2;
             Assert.AreNotSame(origMap, returnedObject2, "Expected different objects, due to snapshot being taken");
@@ -246,7 +246,7 @@
 
             AmqpNmsObjectMessageFacade facade = CreateReceivedObjectMessageFacade(message);
 
-            Assert.Null(facade.Body, "Expected null object");
+            Assert.Null(facade.Object, "Expected null object");
         }
 
         [Test]
@@ -265,7 +265,7 @@
 
             Assert.Catch<IllegalStateException>(() =>
             {
-                object body = facade.Body;
+                object body = facade.Object;
             });
         }
 
@@ -289,7 +289,7 @@
             AmqpNmsObjectMessageFacade facade = CreateReceivedObjectMessageFacade(message);
 
             // verify we get a different-but-equal object back
-            object body = facade.Body;
+            object body = facade.Object;
             Assert.IsInstanceOf<Dictionary<string, string>>(body);
             Dictionary<string, string> returnedObject1 = (Dictionary<string, string>) body;
             Assert.AreNotSame(origMap, returnedObject1, "Expected different objects, due to snapshot being taken");
@@ -297,7 +297,7 @@
 
 
             // verify we get a different-but-equal object back when compared to the previously retrieved object
-            object body2 = facade.Body;
+            object body2 = facade.Object;
             Assert.IsInstanceOf<Dictionary<string, string>>(body2);
             Dictionary<string, string> returnedObject2 = (Dictionary<string, string>) body2;
             Assert.AreNotSame(origMap, returnedObject2, "Expected different objects, due to snapshot being taken");
@@ -323,7 +323,7 @@
             AmqpNmsObjectMessageFacade facade = CreateReceivedObjectMessageFacade(message);
 
             // verify we get a different-but-equal object back
-            object body = facade.Body;
+            object body = facade.Object;
             Assert.IsInstanceOf<Map>(body);
             Map returnedObject1 = (Map) body;
             Assert.AreNotSame(origMap, returnedObject1, "Expected different objects, due to snapshot being taken");
@@ -331,7 +331,7 @@
 
 
             // verify we get a different-but-equal object back when compared to the previously retrieved object
-            object body2 = facade.Body;
+            object body2 = facade.Object;
             Assert.IsInstanceOf<Map>(body2);
             Map returnedObject2 = (Map) body2;
             Assert.AreNotSame(origMap, returnedObject2, "Expected different objects, due to snapshot being taken");
@@ -351,11 +351,11 @@
             String content = "myStringContent";
 
             AmqpNmsObjectMessageFacade amqpObjectMessageFacade = CreateNewObjectMessageFacade();
-            amqpObjectMessageFacade.Body = content;
+            amqpObjectMessageFacade.Object = content;
 
             AmqpNmsObjectMessageFacade copy = amqpObjectMessageFacade.Copy() as AmqpNmsObjectMessageFacade;
             Assert.IsNotNull(copy);
-            Assert.AreEqual(amqpObjectMessageFacade.Body, copy.Body);
+            Assert.AreEqual(amqpObjectMessageFacade.Object, copy.Object);
         }
 
         private static byte[] GetSerializedBytes(object content)
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
index e8c3f6f..b5688d6 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
@@ -20,7 +20,7 @@
 
 namespace NMS.AMQP.Test.TestAmqp
 {
-    class NLogAdapter : ITrace
+    public class NLogAdapter : ITrace
     {
         private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
 
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 0e8f74f..eccf3e9 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -51,6 +51,7 @@
             SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
             SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
             SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY,
+            SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS
         };
 
         private const int CONNECTION_CHANNEL = 0;
@@ -554,6 +555,7 @@
                         Handle = context.Command.Handle,
                         LinkName = context.Command.LinkName,
                         Target = context.Command.Target,
+                        OfferedCapabilities = new []{ SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS }
                     };
 
                     if (refuseLink)
@@ -604,7 +606,49 @@
 
             ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
         }
+        
+        public void ExpectSharedDurableSubscriberAttach(string topicName, string subscriptionName)
+        {
+            Action<string> linkNameMatcher = linkName => Assert.AreEqual(subscriptionName, linkName);
 
+            Action<object> sourceMatcher = o =>
+            {
+                Assert.IsInstanceOf<Source>(o);
+                var source = (Source) o;
+                Assert.AreEqual(topicName, source.Address);
+                Assert.IsFalse(source.Dynamic);
+                Assert.AreEqual(TerminusDurability.UNSETTLED_STATE, (TerminusDurability) source.Durable);
+                Assert.AreEqual(TerminusExpiryPolicy.NEVER, source.ExpiryPolicy);
+                CollectionAssert.Contains(source.Capabilities, SymbolUtil.ATTACH_CAPABILITIES_TOPIC);
+                CollectionAssert.Contains(source.Capabilities, SymbolUtil.SHARED);
+            };
+
+            Action<Target> targetMatcher = Assert.IsNotNull;
+
+            ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
+        }
+
+        public void ExpectSharedSubscriberAttach(string topicName, string subscriptionName)
+        {
+            Action<string> linkNameMatcher = linkName => Assert.AreEqual(subscriptionName+"|volatile1", linkName);
+
+            Action<object> sourceMatcher = o =>
+            {
+                Assert.IsInstanceOf<Source>(o);
+                var source = (Source) o;
+                Assert.AreEqual(topicName, source.Address);
+                Assert.IsFalse(source.Dynamic);
+                // Assert.AreEqual(TerminusDurability.UNSETTLED_STATE, (TerminusDurability) source.Durable);
+                // Assert.AreEqual(TerminusExpiryPolicy.NEVER, source.ExpiryPolicy);
+                CollectionAssert.Contains(source.Capabilities, SymbolUtil.ATTACH_CAPABILITIES_TOPIC);
+                CollectionAssert.Contains(source.Capabilities, SymbolUtil.SHARED);
+            };
+
+            Action<Target> targetMatcher = Assert.IsNotNull;
+
+            ExpectReceiverAttach(linkNameMatcher: linkNameMatcher, sourceMatcher: sourceMatcher, targetMatcher: targetMatcher, settled: false, errorType: null, errorMessage: null);
+        }
+        
         public void ExpectDetach(bool expectClosed, bool sendResponse, bool replyClosed, Symbol errorType = null, String errorMessage = null)
         {
             var detachMatcher = new FrameMatcher<Detach>()