AMQNET-637 NMS 2.0 - Add NMS 2.0 method implementations.
diff --git a/README.md b/README.md
index 17050b8..0a3cccd 100644
--- a/README.md
+++ b/README.md
@@ -70,17 +70,21 @@
 | IConnection | Y | The ConnectionInterruptedListener event and the ConnectionResumedListener are not supported. |
 | ProducerTransformerDelegate | N | Any member access should throw a NotSupportedException. |
 | ConsumerTransformerDelegate | N | Any member access should throw a NotSupportedException. |
-| ISession | Y | 
+| ISession | Y | |
+[ INMSContext | Y | |
 | IQueue | Y | |
 | ITopic | Y | |
 | ITemporaryQueue | Y | |
 | ITemporaryTopic | Y | |
 | IMessageProducer | Y * | Anonymous producers are only supported on connections with the ANONYMOUS-RELAY capability. |
+| INMSProducer | Y | |
 | MsgDeliveryMode.Persistent | Y | Producers will block on send until an outcome is received or will timeout after waiting the RequestTimeout timespan amount. Exceptions may be throw depending on the outcome or if the producer times out. |
 | MsgDeliveryMode.NonPersistent | Y | Producers will not block on send nor expect to receive an outcome. Should an exception be raised from the outcome the exception will be delivered using the the connection ExceptionListener. |
 | IMessageConsumer | Y | |
+| INMSConsumer | Y | |
 | Durable Consumers | Y | |
-| IQueueBrowser | N | The provider will throw NotImplementedException for the ISession create methods. |
+| Shared Consumers | Y | |
+| IQueueBrowser | Y | |
 | Configurable NMSMessageID and amqp serializtion | N | For future consideration. The prodiver will generate a MessageID from a sequence and serialize it as a string. |
 | Flow control configuration | N | For future consideration. The provider will use amqpnetlite defaults except for initial link credits which is 200. |
 | Object Deserialization Policy | N | For future consideration. The provider considers all Dotnet serialized objects in Object Message bodies are safe to deserialize. |
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index 872fcdc..7b4ac02 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -81,7 +81,7 @@
     <ItemGroup>
         <!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
         <PackageReference Include="AMQPNetLite.Core" Version="2.4.0" />
-        <PackageReference Include="Apache.NMS" Version="1.8.0" />
+        <PackageReference Include="Apache.NMS" Version="2.0.0" />
         <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
     </ItemGroup>
 </Project>
diff --git a/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
index bfb90ca..8561a57 100644
--- a/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
@@ -35,6 +35,7 @@
         IDestination NMSReplyTo { get; set; }
         DateTime NMSTimestamp { get; set; }
         string NMSType { get; set; }
+        DateTime DeliveryTime { get; set; }
         string GroupId { get; set; }
         uint GroupSequence { get; set; }
         DateTime? Expiration { get; set; }
@@ -52,5 +53,7 @@
         object ProviderMessageIdObject { get; set; }
 
         INmsMessageFacade Copy();
+
+        bool HasBody();
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs b/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
index cea2edc..c6535aa 100644
--- a/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
@@ -19,6 +19,6 @@
 {
     public interface INmsObjectMessageFacade : INmsMessageFacade
     {
-        object Body { get; set; }        
+        object Object { get; set; }        
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsBytesMessage.cs b/src/NMS.AMQP/Message/NmsBytesMessage.cs
index 62eaa5b..199e716 100644
--- a/src/NMS.AMQP/Message/NmsBytesMessage.cs
+++ b/src/NMS.AMQP/Message/NmsBytesMessage.cs
@@ -449,5 +449,19 @@
             CopyInto(copy);
             return copy;
         }
+        
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            return !facade.HasBody() || type.IsAssignableFrom(typeof(byte[]));
+        }
+        
+        protected override T DoGetBody<T>() {
+            if (!facade.HasBody()) {
+                return default;
+            }
+
+            object o = Content;
+            return (T) o;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMapMessage.cs b/src/NMS.AMQP/Message/NmsMapMessage.cs
index 0d74176..53005f2 100644
--- a/src/NMS.AMQP/Message/NmsMapMessage.cs
+++ b/src/NMS.AMQP/Message/NmsMapMessage.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using Apache.NMS.AMQP.Message.Facade;
 using Apache.NMS.Util;
 
@@ -55,5 +56,18 @@
             CopyInto(copy);
             return copy;
         }
+        
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            return !facade.HasBody() || type.IsAssignableFrom(typeof(IPrimitiveMap));
+        }
+        
+        protected override T DoGetBody<T>() {
+            if (!facade.HasBody()) {
+                return default;
+            }
+
+            return (T) Body;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMessage.cs b/src/NMS.AMQP/Message/NmsMessage.cs
index 16871ab..1ca0f14 100644
--- a/src/NMS.AMQP/Message/NmsMessage.cs
+++ b/src/NMS.AMQP/Message/NmsMessage.cs
@@ -33,7 +33,9 @@
 
         public INmsMessageFacade Facade { get; }
 
-        public IPrimitiveMap Properties => properties ?? (properties = new MessagePropertyIntercepter(this, Facade.Properties, IsReadOnlyProperties));
+        public IPrimitiveMap Properties => properties ??
+                                           (properties = new MessagePropertyIntercepter(this, Facade.Properties,
+                                               IsReadOnlyProperties));
 
         public string NMSCorrelationID
         {
@@ -109,6 +111,12 @@
             set => Facade.NMSType = value;
         }
 
+        public DateTime NMSDeliveryTime
+        {
+            get => Facade.DeliveryTime;
+            set => Facade.DeliveryTime = value;
+        }
+
         public string NMSXGroupId
         {
             get => Facade.GroupId;
@@ -255,5 +263,25 @@
             target.IsReadOnlyProperties = IsReadOnlyProperties;
             target.NmsAcknowledgeCallback = NmsAcknowledgeCallback;
         }
+
+        public virtual bool IsBodyAssignableTo(Type type)
+        {
+            return true;
+        }
+
+        public T Body<T>()
+        {
+            if (IsBodyAssignableTo(typeof(T)))
+            {
+                return DoGetBody<T>();
+            }
+
+            throw new MessageFormatException("Message body cannot be read as type: " + typeof(T));
+        }
+
+        protected virtual T DoGetBody<T>()
+        {
+            return default;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMessageTransformation.cs b/src/NMS.AMQP/Message/NmsMessageTransformation.cs
index ef1acb1..b9c4a3c 100644
--- a/src/NMS.AMQP/Message/NmsMessageTransformation.cs
+++ b/src/NMS.AMQP/Message/NmsMessageTransformation.cs
@@ -16,6 +16,7 @@
  */
 
 using System.Collections;
+using Apache.NMS.AMQP.Util.Types.Map;
 
 namespace Apache.NMS.AMQP.Message
 {
@@ -106,7 +107,7 @@
             CopyMap(source.Properties, target.Properties);
         }
 
-        private static void CopyMap(IPrimitiveMap source, IPrimitiveMap target)
+        public static void CopyMap(IPrimitiveMap source, IPrimitiveMap target)
         {
             foreach (object key in source.Keys)
             {
@@ -151,6 +152,12 @@
                     case IDictionary dictionaryValue:
                         target.SetDictionary(name, dictionaryValue);
                         break;
+                    case object objectValue:
+                        if (target is PrimitiveMapBase primitiveMapBase)
+                        {
+                            primitiveMapBase.SetObject(name, objectValue);
+                        }
+                        break;
                 }
             }
         }
diff --git a/src/NMS.AMQP/Message/NmsObjectMessage.cs b/src/NMS.AMQP/Message/NmsObjectMessage.cs
index 556f9c2..87f0dd7 100644
--- a/src/NMS.AMQP/Message/NmsObjectMessage.cs
+++ b/src/NMS.AMQP/Message/NmsObjectMessage.cs
@@ -16,6 +16,8 @@
  */
 
 using System;
+using System.Reflection;
+using System.Runtime.CompilerServices;
 using Apache.NMS.AMQP.Message.Facade;
 
 namespace Apache.NMS.AMQP.Message
@@ -31,13 +33,13 @@
 
         public object Body
         {
-            get => this.facade.Body;
+            get => this.facade.Object;
             set
             {
                 CheckReadOnlyBody();
                 try
                 {
-                    this.facade.Body = value;
+                    this.facade.Object = value;
                 }
                 catch (Exception e)
                 {
@@ -57,5 +59,22 @@
             CopyInto(copy);
             return copy;
         }
+
+
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            if (!facade.HasBody())
+            {
+                return true;
+            }
+
+            return type.IsInstanceOfType(Body);
+        }
+        
+        protected override T DoGetBody<T>()
+        {
+            return (T) Body;
+        }
+
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsStreamMessage.cs b/src/NMS.AMQP/Message/NmsStreamMessage.cs
index 03d6150..0e085a0 100644
--- a/src/NMS.AMQP/Message/NmsStreamMessage.cs
+++ b/src/NMS.AMQP/Message/NmsStreamMessage.cs
@@ -465,5 +465,11 @@
             CopyInto(copy);
             return copy;
         }
+        
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            return false;
+        }
+
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsTextMessage.cs b/src/NMS.AMQP/Message/NmsTextMessage.cs
index 9334a36..7932207 100644
--- a/src/NMS.AMQP/Message/NmsTextMessage.cs
+++ b/src/NMS.AMQP/Message/NmsTextMessage.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using Apache.NMS.AMQP.Message.Facade;
 
 namespace Apache.NMS.AMQP.Message
@@ -49,5 +50,16 @@
             CopyInto(copy);
             return copy;
         }
+        
+        public override bool IsBodyAssignableTo(Type type)
+        {
+            return !facade.HasBody() || type.IsAssignableFrom(typeof(string));
+        }
+        
+        protected override T DoGetBody<T>()
+        {
+            object o = Text;
+            return (T) o;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
index 0df66eb..70cf529 100644
--- a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
@@ -34,7 +34,9 @@
         public string Selector { get; set; }
         public bool NoLocal { get; set; }
         public string SubscriptionName { get; set; }
+        public bool IsExplicitClientId { get; set; }
         public bool IsDurable { get; set; }
+        public bool IsShared { get; set; }
         public bool LocalMessageExpiry { get; set; }
         public bool IsBrowser { get; set; }
         public int LinkCredit { get; set; } = DEFAULT_CREDIT;
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs b/src/NMS.AMQP/NmsConnectionFactory.cs
index 7f3f1c5..88101bb 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -181,6 +181,26 @@
             }

         }

 

+        public INMSContext CreateContext()

+        {

+            return new NmsContext((NmsConnection)CreateConnection(), AcknowledgementMode.AutoAcknowledge);

+        }

+

+        public INMSContext CreateContext(AcknowledgementMode acknowledgementMode)

+        {

+            return new NmsContext((NmsConnection)CreateConnection(), acknowledgementMode);

+        }

+

+        public INMSContext CreateContext(string userName, string password)

+        {

+            return new NmsContext((NmsConnection)CreateConnection(userName, password), AcknowledgementMode.AutoAcknowledge);

+        }

+

+        public INMSContext CreateContext(string userName, string password, AcknowledgementMode acknowledgementMode)

+        {

+            return new NmsContext((NmsConnection)CreateConnection(userName, password), acknowledgementMode);

+        }

+

         public Uri BrokerUri

         {

             get => brokerUri;

@@ -247,7 +267,7 @@
             }

             else

             {

-                connectionInfo.SetClientId(ClientIdGenerator.GenerateId().ToString(), false);

+                connectionInfo.SetClientId(ClientIdGenerator.GenerateId(), false);

             }

 

             return connectionInfo;

diff --git a/src/NMS.AMQP/NmsConsumer.cs b/src/NMS.AMQP/NmsConsumer.cs
new file mode 100644
index 0000000..9a2d40d
--- /dev/null
+++ b/src/NMS.AMQP/NmsConsumer.cs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsConsumer : INMSConsumer
+    {
+        
+        private readonly ISession session;
+        private readonly NmsMessageConsumer consumer;
+
+        public NmsConsumer(ISession session, NmsMessageConsumer consumer) {
+            this.session = session;
+            this.consumer = consumer;
+        }
+
+        public void Dispose()
+        {
+            consumer.Dispose();
+        }
+
+        public IMessage Receive()
+        {
+            return consumer.Receive();
+        }
+
+        public IMessage Receive(TimeSpan timeout)
+        {
+            return consumer.Receive(timeout);
+        }
+
+        public IMessage ReceiveNoWait()
+        {
+            return consumer.ReceiveNoWait();
+        }
+
+        public T ReceiveBody<T>()
+        {
+            return consumer.ReceiveBody<T>();
+        }
+
+        public T ReceiveBody<T>(TimeSpan timeout)
+        {
+            return consumer.ReceiveBody<T>(timeout);
+        }
+
+        public T ReceiveBodyNoWait<T>()
+        {
+            return consumer.ReceiveBodyNoWait<T>();
+        }
+
+        public void Close()
+        {
+            consumer.Close();
+        }
+
+        public string MessageSelector => consumer.MessageSelector;
+
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get => consumer.ConsumerTransformer; 
+            set => consumer.ConsumerTransformer = value; 
+        }
+
+        event MessageListener INMSConsumer.Listener
+        {
+            add => ((IMessageConsumer)consumer).Listener += value;
+            remove => ((IMessageConsumer)consumer).Listener -= value;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsContext.cs b/src/NMS.AMQP/NmsContext.cs
new file mode 100644
index 0000000..882f583
--- /dev/null
+++ b/src/NMS.AMQP/NmsContext.cs
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsContext : INMSContext
+    {
+        private readonly object syncRoot = new object();
+
+        private readonly NmsConnection connection;
+        private readonly AtomicLong connectionRefCount;
+        public AcknowledgementMode AcknowledgementMode { get; }
+
+        private NmsSession session;
+        private NmsMessageProducer sharedProducer;
+        private bool autoStart = true;
+
+        public NmsContext(NmsConnection connection, AcknowledgementMode acknowledgementMode)
+        {
+            this.connection = connection;
+            this.AcknowledgementMode = acknowledgementMode;
+            this.connectionRefCount = new AtomicLong(1);
+        }
+
+        private NmsContext(NmsConnection connection, AcknowledgementMode acknowledgementMode,
+            AtomicLong connectionRefCount)
+        {
+            this.connection = connection;
+            this.AcknowledgementMode = acknowledgementMode;
+            this.connectionRefCount = connectionRefCount;
+        }
+
+        public void Dispose()
+        {
+            connection.Dispose();
+        }
+
+        public void Start()
+        {
+            connection.Start();
+        }
+
+        public bool IsStarted { get => connection.IsStarted; }
+        
+        public void Stop()
+        {
+            connection.Stop();
+        }
+
+        public INMSContext CreateContext(AcknowledgementMode acknowledgementMode)
+        {
+            if (connectionRefCount.Get() == 0) {
+                throw new IllegalStateException("The Connection is closed");
+            }
+            
+            connectionRefCount.IncrementAndGet();
+            return new NmsContext(connection, acknowledgementMode, connectionRefCount);
+        }
+
+        public INMSProducer CreateProducer()
+        {
+            if (sharedProducer == null) {
+                lock (syncRoot) {
+                    if (sharedProducer == null) {
+                        sharedProducer = (NmsMessageProducer) GetSession().CreateProducer();
+                    }
+                }
+            }
+            return new NmsProducer(GetSession(), sharedProducer);
+        }
+
+        public INMSConsumer CreateConsumer(IDestination destination)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateConsumer(destination)));
+        }
+
+        public INMSConsumer CreateConsumer(IDestination destination, string selector)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateConsumer(destination, selector)));
+        }
+
+        public INMSConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateConsumer(destination, selector, noLocal)));
+        }
+
+        public INMSConsumer CreateDurableConsumer(ITopic destination, string subscriptionName)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateDurableConsumer(destination, subscriptionName)));
+        }
+
+        public INMSConsumer CreateDurableConsumer(ITopic destination, string subscriptionName, string selector)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateDurableConsumer(destination, subscriptionName, selector)));
+        }
+
+        public INMSConsumer CreateDurableConsumer(ITopic destination, string subscriptionName, string selector, bool noLocal)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateDurableConsumer(destination, subscriptionName, selector, noLocal)));
+        }
+
+        public INMSConsumer CreateSharedConsumer(ITopic destination, string subscriptionName)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateSharedConsumer(destination, subscriptionName)));
+        }
+
+        public INMSConsumer CreateSharedConsumer(ITopic destination, string subscriptionName, string selector)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateSharedConsumer(destination, subscriptionName, selector)));
+        }
+
+        public INMSConsumer CreateSharedDurableConsumer(ITopic destination, string subscriptionName)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateSharedDurableConsumer(destination, subscriptionName)));
+        }
+
+        public INMSConsumer CreateSharedDurableConsumer(ITopic destination, string subscriptionName, string selector)
+        {
+            return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateSharedDurableConsumer(destination, subscriptionName, selector)));
+        }
+
+        public void Unsubscribe(string name)
+        {
+            GetSession().Unsubscribe(name);
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue)
+        {
+            return GetSession().CreateBrowser(queue);
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+        {
+            return GetSession().CreateBrowser(queue, selector);
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            return GetSession().GetQueue(name);
+        }
+
+        public ITopic GetTopic(string name)
+        {
+            return GetSession().GetTopic(name);
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            return GetSession().CreateTemporaryQueue();
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            return GetSession().CreateTemporaryTopic();
+        }
+
+        public IMessage CreateMessage()
+        {
+            return GetSession().CreateMessage();
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            return GetSession().CreateTextMessage();
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            return GetSession().CreateTextMessage(text);
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return GetSession().CreateMapMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            return GetSession().CreateObjectMessage(body);
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return GetSession().CreateBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            return GetSession().CreateBytesMessage(body);
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return GetSession().CreateStreamMessage();
+        }
+
+        public void Close()
+        {
+            NMSException failure = null;
+
+            try
+            {
+                session?.Close();
+            } catch (NMSException jmse)
+            {
+                failure = jmse;
+            }
+
+            if (connectionRefCount.DecrementAndGet() == 0) {
+                try {
+                    connection.Close();
+                } catch (NMSException jmse) {
+                    if (failure == null)
+                    {
+                        failure = jmse;
+                    }
+                }
+            }
+
+            if (failure != null) {
+                throw failure;
+            }
+        }
+
+        public void Recover()
+        {
+            GetSession().Recover();
+        }
+
+        public void Acknowledge()
+        {
+            GetSession().Acknowledge();
+        }
+
+        public void Commit()
+        {
+            GetSession().Commit();
+        }
+
+        public void Rollback()
+        {
+            GetSession().Rollback();
+        }
+
+        public void PurgeTempDestinations()
+        {
+            connection.PurgeTempDestinations();
+        }
+        
+        
+        private NmsSession GetSession() {
+            if (session == null) {
+                lock (syncRoot) {
+                    if (session == null)
+                    {
+                        session = (NmsSession) connection.CreateSession(AcknowledgementMode);
+                    }
+                }
+            }
+            return session;
+        }
+        
+        private NmsConsumer StartIfNeeded(NmsConsumer consumer) {
+            if (autoStart) {
+                connection.Start();
+            }
+            return consumer;
+        }
+        
+
+        public ConsumerTransformerDelegate ConsumerTransformer { get => session.ConsumerTransformer; set => session.ConsumerTransformer = value; }
+        
+        public ProducerTransformerDelegate ProducerTransformer { get => session.ProducerTransformer; set => session.ProducerTransformer = value; }
+        
+        public TimeSpan RequestTimeout { get => session.RequestTimeout; set => session.RequestTimeout = value; }
+        
+        public bool Transacted => session.Transacted;
+        
+        public string ClientId { get => connection.ClientId; set => connection.ClientId = value; }
+        
+        public bool AutoStart { get => autoStart; set => autoStart = value; }
+        
+        public event SessionTxEventDelegate TransactionStartedListener;
+        
+        public event SessionTxEventDelegate TransactionCommittedListener;
+        
+        public event SessionTxEventDelegate TransactionRolledBackListener;
+        
+        public event ExceptionListener ExceptionListener;
+        
+        public event ConnectionInterruptedListener ConnectionInterruptedListener;
+        
+        public event ConnectionResumedListener ConnectionResumedListener;
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs b/src/NMS.AMQP/NmsDurableMessageConsumer.cs
similarity index 82%
rename from src/NMS.AMQP/NmsDurableTopicSubscriber.cs
rename to src/NMS.AMQP/NmsDurableMessageConsumer.cs
index bc446bb..60ef3c8 100644
--- a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
+++ b/src/NMS.AMQP/NmsDurableMessageConsumer.cs
@@ -19,16 +19,19 @@
 
 namespace Apache.NMS.AMQP
 {
-    public class NmsDurableTopicSubscriber : NmsMessageConsumer
+    public class NmsDurableMessageConsumer : NmsMessageConsumer
     {
-        public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+        public NmsDurableMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
         {
         }
 
-        public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+        public NmsDurableMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
         {
         }
 
         protected override bool IsDurableSubscription => true;
+        
+        protected override bool IsSharedSubscription => false;
+
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index 121a93c..36abe8f 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -55,9 +55,13 @@
                 Destination = destination,
                 Selector = selector,
                 NoLocal = noLocal,
+                IsExplicitClientId = Session.Connection.ConnectionInfo.IsExplicitClientId,
                 SubscriptionName = name,
-                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry,
-                IsDurable = IsDurableSubscription
+                IsShared = IsSharedSubscription,
+                IsDurable = IsDurableSubscription,
+                IsBrowser =  IsBrowser,
+                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry
+
             };
             deliveryTask = new MessageDeliveryTask(this);
             
@@ -74,6 +78,10 @@
         public IDestination Destination => Info.Destination;
 
         protected virtual bool IsDurableSubscription => false;
+        
+        protected virtual bool IsSharedSubscription => false;
+        
+        protected virtual bool IsBrowser => false;
 
         public void Dispose()
         {
@@ -101,6 +109,8 @@
 
         public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
 
+        public string MessageSelector => Info.Selector;
+
         event MessageListener IMessageConsumer.Listener
         {
             add
@@ -134,6 +144,21 @@
                 }
             }
         }
+        
+        public T ReceiveBody<T>()
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            while (true)
+            {
+                if (started)
+                {
+                    return ReceiveBodyInternal<T>(-1);
+                }
+            }
+        }
+
 
         public IMessage ReceiveNoWait()
         {
@@ -142,6 +167,14 @@
 
             return started ? ReceiveInternal(0) : null;
         }
+        
+        public T ReceiveBodyNoWait<T>()
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            return started ? ReceiveBodyInternal<T>(0) : default;
+        }
 
         public IMessage Receive(TimeSpan timeout)
         {
@@ -171,6 +204,35 @@
                 }
             }
         }
+        
+        public T ReceiveBody<T>(TimeSpan timeout)
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            int timeoutInMilliseconds = (int) timeout.TotalMilliseconds;
+
+            if (started)
+            {
+                return ReceiveBodyInternal<T>(timeoutInMilliseconds);
+            }
+
+            long deadline = GetDeadline(timeoutInMilliseconds);
+
+            while (true)
+            {
+                timeoutInMilliseconds = (int) (deadline - DateTime.UtcNow.Ticks / 10_000L);
+                if (timeoutInMilliseconds < 0)
+                {
+                    return default;
+                }
+
+                if (started)
+                {
+                    return ReceiveBodyInternal<T>(timeoutInMilliseconds);
+                }
+            }
+        }
 
         private void CheckMessageListener()
         {
@@ -331,6 +393,42 @@
 
         private IMessage ReceiveInternal(int timeout)
         {
+            return ReceiveInternal(timeout, envelope =>
+            {
+                IMessage message = envelope.Message.Copy();
+                AckFromReceive(envelope);
+                return message;
+            });
+        }
+        
+        private T ReceiveBodyInternal<T>(int timeout)
+        {
+            return ReceiveInternal<T>(timeout, envelope =>
+            {
+                try
+                {
+                    T body = envelope.Message.Body<T>();
+                    AckFromReceive(envelope);
+                    return body;
+                }
+                catch (MessageFormatException mfe)
+                {
+                    // Should behave as if receiveBody never happened in these modes.
+                    if (acknowledgementMode == AcknowledgementMode.AutoAcknowledge ||
+                        acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge) {
+
+                        envelope.EnqueueFirst = true;
+                        OnInboundMessage(envelope);
+                    }
+
+                    throw mfe;
+                }
+            });
+        }
+
+
+        private T ReceiveInternal<T>(int timeout, Func<InboundMessageDispatch, T> func)
+        {
             try
             {
                 long deadline = 0;
@@ -352,7 +450,7 @@
                         throw NMSExceptionSupport.Create(failureCause);
 
                     if (envelope == null)
-                        return null;
+                        return default;
 
                     if (IsMessageExpired(envelope))
                     {
@@ -383,8 +481,7 @@
                             Tracer.Debug($"{Info.Id} received message {envelope.Message.NMSMessageId}.");
                         }
 
-                        AckFromReceive(envelope);
-                        return envelope.Message.Copy();
+                        return func.Invoke(envelope);
                     }
                 }
             }
@@ -397,6 +494,7 @@
                 throw ExceptionSupport.Wrap(ex, "Receive failed");
             }
         }
+        
 
         private static long GetDeadline(int timeout)
         {
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index e1b40ba..20a706c 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -31,6 +31,7 @@
         private readonly AtomicLong messageSequence = new AtomicLong();
 
         private Exception failureCause;
+        private TimeSpan deliveryDelay = TimeSpan.Zero;
         private MsgDeliveryMode deliveryMode = MsgDeliveryMode.Persistent;
         private TimeSpan timeToLive = NMSConstants.defaultTimeToLive;
         private TimeSpan requestTimeout;
@@ -84,8 +85,73 @@
 
         public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
         {
+            Send(destination, message, deliveryMode, priority, timeToLive, null);
+        }
+
+        public void Send(IMessage message, CompletionListener completionListener)
+        {
+            Send(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);        
+        }
+
+        public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive,
+            CompletionListener completionListener)
+        {
+            Send(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);        
+        }
+
+        public void Send(IDestination destination, IMessage message, CompletionListener completionListener)
+        {
+            Send(destination, message, deliveryMode, priority, timeToLive, completionListener);        
+        }
+
+        public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
+            TimeSpan timeToLive, CompletionListener completionListener)
+        {
             CheckClosed();
-            session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp);
+            session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay, completionListener);
+        }
+
+        public Task SendAsync(IMessage message)
+        {
+            return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive);
+        }
+        
+        public Task SendAsync(IMessage message, CompletionListener completionListener)
+        {
+            return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);
+        }
+
+        public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive);
+        }
+        
+        public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, CompletionListener completionListener)
+        {
+            return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);
+        }
+
+        public Task SendAsync(IDestination destination, IMessage message)
+        {
+            return SendAsync(destination, message, deliveryMode, priority, timeToLive);
+        }
+        
+        public Task SendAsync(IDestination destination, IMessage message, CompletionListener completionListener)
+        {
+            return SendAsync(destination, message, deliveryMode, priority, timeToLive, completionListener);
+        }
+        
+        public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
+            TimeSpan timeToLive)
+        {
+            return SendAsync(destination, message, deliveryMode, priority, timeToLive, null);
+        }
+
+        public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
+            TimeSpan timeToLive, CompletionListener completionListener)
+        {
+            CheckClosed();
+            return session.SendAsync(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay, null);
         }
 
         public void Close()
@@ -231,6 +297,20 @@
             }
         }
 
+        public TimeSpan DeliveryDelay
+        {
+            get
+            {
+                CheckClosed();
+                return deliveryDelay;
+            }
+            set
+            {
+                CheckClosed();
+                deliveryDelay = value;
+            }
+        }
+        
         public Task OnConnectionRecovery(IProvider provider)
         {
             return provider.CreateResource(Info);
diff --git a/src/NMS.AMQP/NmsProducer.cs b/src/NMS.AMQP/NmsProducer.cs
new file mode 100644
index 0000000..bc6ed17
--- /dev/null
+++ b/src/NMS.AMQP/NmsProducer.cs
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Threading.Tasks;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsProducer : INMSProducer
+    {
+        
+        private readonly ISession session;
+        private readonly NmsMessageProducer producer;
+
+        private CompletionListener completionListener;
+
+        // Message Headers
+        private String correlationId;
+        private String type;
+        private IDestination replyTo;
+
+        // Message Properties
+        private readonly IPrimitiveMap messageProperties = new PrimitiveMap();
+
+        /**
+         * Create a new JMSProducer instance.
+         *
+         * The producer is backed by the given Session object and uses the shared MessageProducer
+         * instance to send all of its messages.
+         *
+         * @param session
+         *      The Session that created this JMSProducer
+         * @param producer
+         *      The shared MessageProducer owned by the parent Session.
+         */
+        public NmsProducer(ISession session, NmsMessageProducer producer) {
+            this.session = session;
+            this.producer = producer;
+        }
+
+        
+        public void Dispose()
+        {
+            producer.Dispose();
+        }
+
+        public INMSProducer Send(IDestination destination, IMessage message)  {
+
+            if (message == null) {
+                throw new MessageFormatException("Message must not be null");
+            }
+            
+            NmsMessageTransformation.CopyMap(messageProperties, message.Properties);
+            
+            if (correlationId != null) {
+                message.NMSCorrelationID = correlationId;
+            }
+            if (type != null) {
+                message.NMSType = type;
+            }
+            if (replyTo != null) {
+                message.NMSReplyTo = replyTo;
+            }
+            
+            producer.Send(destination, message, completionListener);
+            
+            return this;
+        }
+
+        public INMSProducer Send(IDestination destination, string body)
+        {
+            return Send(destination, CreateTextMessage(body));
+        }
+
+        public INMSProducer Send(IDestination destination, IPrimitiveMap body)
+        {
+            IMapMessage message = CreateMapMessage();
+            NmsMessageTransformation.CopyMap(body, message.Body);
+            return Send(destination, message);
+        }
+
+        public INMSProducer Send(IDestination destination, byte[] body)
+        {
+            return Send(destination, CreateBytesMessage(body));
+        }
+
+        public INMSProducer Send(IDestination destination, object body)
+        {
+            return Send(destination, CreateObjectMessage(body));
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, IMessage message)
+        {
+            if (message == null) {
+                throw new MessageFormatException("Message must not be null");
+            }
+            
+            NmsMessageTransformation.CopyMap(messageProperties, message.Properties);
+            
+            if (correlationId != null) {
+                message.NMSCorrelationID = correlationId;
+            }
+            if (type != null) {
+                message.NMSType = type;
+            }
+            if (replyTo != null) {
+                message.NMSReplyTo = replyTo;
+            }
+
+            Task task = producer.SendAsync(destination, message, completionListener);
+            return task.ContinueWith(t => (INMSProducer) this);
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, string body)
+        {
+            return SendAsync(destination, CreateTextMessage(body));
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, IPrimitiveMap body)
+        {
+            IMapMessage message = CreateMapMessage();
+            NmsMessageTransformation.CopyMap(body, message.Body);
+            return SendAsync(destination, message);
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, byte[] body)
+        {
+            return SendAsync(destination, CreateBytesMessage(body));
+        }
+
+        public Task<INMSProducer> SendAsync(IDestination destination, object body)
+        {
+            return SendAsync(destination, CreateObjectMessage(body));
+        }
+        
+        public CompletionListener CompletionListener
+        {
+            get => completionListener;
+            set => completionListener = value;
+        }
+
+        public INMSProducer SetCompletionListener(CompletionListener completionListener)
+        {
+            CompletionListener = completionListener;
+            return this;
+        }
+
+        public INMSProducer ClearProperties()
+        {
+            messageProperties.Clear();
+            return this;
+        }
+
+
+        public IMessage CreateMessage()
+        {
+            return session.CreateMessage();
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            return session.CreateTextMessage();
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            return session.CreateTextMessage(text);
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return session.CreateMapMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            return session.CreateObjectMessage(body);
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return session.CreateBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            return session.CreateBytesMessage(body);
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return session.CreateStreamMessage();
+        }
+
+        public void Close()
+        {
+            producer.Close();
+        }
+
+
+        public string NMSCorrelationID
+        {
+            get => correlationId;
+            set => correlationId = value;
+        }
+        
+        public INMSProducer SetNMSCorrelationID(string correlationID)
+        {
+            NMSCorrelationID = correlationID;
+            return this;
+        }
+
+
+        public IDestination NMSReplyTo
+        {
+            get => replyTo;
+            set => replyTo = value;
+        }
+
+        public INMSProducer SetNMSReplyTo(IDestination replyTo)
+        {
+            NMSReplyTo = replyTo;
+            return this;
+        }
+        
+        public string NMSType
+        {
+            get => type;
+            set => type = value;
+        }
+
+        public INMSProducer SetNMSType(string type)
+        {
+            NMSType = type;
+            return this;
+        }
+
+        public MsgDeliveryMode DeliveryMode
+        {
+            get => producer.DeliveryMode;
+            set => producer.DeliveryMode = value;
+        }
+        
+        public INMSProducer SetDeliveryMode(MsgDeliveryMode deliveryMode)
+        {
+            DeliveryMode = deliveryMode;
+            return this;
+        }
+
+        public TimeSpan TimeToLive
+        {
+            get => producer.TimeToLive;
+            set => producer.TimeToLive = value;
+        }
+
+        public INMSProducer SetTimeToLive(TimeSpan timeToLive)
+        {
+            TimeToLive = timeToLive;
+            return this;
+        }
+
+        public TimeSpan RequestTimeout
+        {
+            get => producer.RequestTimeout;
+            set => producer.RequestTimeout = value;
+        }
+
+        public MsgPriority Priority
+        {
+            get => producer.Priority;
+            set => producer.Priority = value;
+        }
+        
+        public INMSProducer SetPriority(MsgPriority priority)
+        {
+            Priority = priority;
+            return this;
+        }
+
+        public bool DisableMessageID
+        {
+            get => producer.DisableMessageID;
+            set => producer.DisableMessageID = value;
+        }
+        
+        public INMSProducer SetDisableMessageID(bool value)
+        {
+            DisableMessageID = value;
+            return this;
+        }
+
+        public bool DisableMessageTimestamp
+        {
+            get => producer.DisableMessageTimestamp;
+            set => producer.DisableMessageTimestamp = value;
+        }
+
+        public INMSProducer SetDisableMessageTimestamp(bool value)
+        {
+            DisableMessageTimestamp = value;
+            return this;
+        }
+
+        public TimeSpan DeliveryDelay
+        {
+            get => producer.DeliveryDelay;
+            set => producer.DeliveryDelay = value;
+        }
+        
+        public INMSProducer SetDeliveryDelay(TimeSpan deliveryDelay)
+        {
+            DeliveryDelay = deliveryDelay;
+            return this;
+        }
+        
+        public IPrimitiveMap Properties => messageProperties;
+
+        public INMSProducer SetProperty(string name, bool value)
+        {
+            messageProperties.SetBool(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, byte value)
+        {
+            messageProperties.SetByte(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, double value)
+        {
+            messageProperties.SetDouble(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, float value)
+        {
+            messageProperties.SetFloat(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, int value)
+        {
+            messageProperties.SetInt(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, long value)
+        {
+            messageProperties.SetLong(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, short value)
+        {
+            messageProperties.SetShort(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, char value)
+        {
+            messageProperties.SetChar(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, string value)
+        {
+            messageProperties.SetString(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, byte[] value)
+        {
+            messageProperties.SetBytes(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, IList value)
+        {
+            messageProperties.SetList(name, value);
+            return this;
+        }
+
+        public INMSProducer SetProperty(string name, IDictionary value)
+        {
+            messageProperties.SetDictionary(name, value);
+            return this;
+        }
+
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get => producer.ProducerTransformer; 
+            set => producer.ProducerTransformer = value;
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsQueueBrowser.cs b/src/NMS.AMQP/NmsQueueBrowser.cs
new file mode 100644
index 0000000..e1eb348
--- /dev/null
+++ b/src/NMS.AMQP/NmsQueueBrowser.cs
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsQueueBrowser : IQueueBrowser, IEnumerator
+    {
+        private readonly object syncRoot = new object();
+
+        private readonly NmsSession session;
+        private readonly IQueue destination;
+        private readonly string selector;
+
+        private volatile NmsMessageConsumer consumer;
+
+        private IMessage current;
+        private readonly AtomicBool closed = new AtomicBool();
+        
+        public NmsQueueBrowser(NmsSession session, IQueue destination, string selector)
+        {
+            this.session = session;
+            this.destination = destination;
+            this.selector = selector;
+        }
+
+        public IEnumerator GetEnumerator()
+        {
+            CheckClosed();
+            CreateConsumer();
+
+            return this;
+        }
+
+        public bool MoveNext()
+        {
+            current = Next();
+
+            if (!session.IsStarted) {
+                DestroyConsumer();
+                return false;
+            }
+            
+            return current != null;
+        }
+        
+        private IMessage Next() {
+            while (true) {
+                IMessageConsumer consumer = this.consumer;
+                if (consumer == null) {
+                    return null;
+                }
+
+                IMessage next = null;
+
+                try {
+                    next = consumer.ReceiveNoWait();
+                } catch (NMSException e) {
+                    Tracer.WarnFormat("Error while receive the next message: {}", e.Message);
+                }
+
+                if (next == null) {
+                    DestroyConsumer();
+                }
+
+                return next;
+            }
+        }
+
+        public void Reset()
+        {
+            CheckClosed();
+            DestroyConsumer();
+            CreateConsumer();
+        }
+
+        public object Current
+        {
+            get => current;
+        }
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public void Close()
+        {
+            if (closed.CompareAndSet(false, true)) {
+                DestroyConsumer();
+            }
+        }
+
+        public string MessageSelector => selector;
+        public IQueue Queue => destination;
+
+        private void CheckClosed()
+        {
+            if (closed)
+            {
+                throw new IllegalStateException("The MessageConsumer is closed");
+            }
+        }
+
+        private void CreateConsumer()
+        {
+            lock (syncRoot)
+            {
+                if (consumer == null)
+                {
+                    NmsMessageConsumer messageConsumer = new NmsQueueBrowserMessageConsumer(session.GetNextConsumerId(), session,
+                        destination, selector, false);
+
+                    messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
+
+                    // Assign only after fully created and initialized.
+                    consumer = messageConsumer;
+                }
+            }
+        }
+        
+        private void DestroyConsumer()
+        {
+            lock (syncRoot)
+            {
+                try
+                {
+                    consumer?.Close();
+                }
+                catch (NMSException e)
+                {
+                    Tracer.DebugFormat("Error closing down internal consumer: ", e);
+                }
+                finally
+                {
+                    consumer = null;
+                }
+            }
+        }
+
+        public class NmsQueueBrowserMessageConsumer : NmsMessageConsumer
+        {
+            public NmsQueueBrowserMessageConsumer(NmsConsumerId consumerId, NmsSession session,
+                IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination,
+                selector, noLocal)
+            {
+            }
+
+            protected override bool IsBrowser => true;
+
+        }
+    }
+}
+
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 425cf51..e90db35 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -144,23 +144,68 @@
             return messageConsumer;
         }
 
-        private NmsConsumerId GetNextConsumerId()
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name)
         {
-            return new NmsConsumerId(SessionInfo.Id, consumerIdGenerator.IncrementAndGet());
+            return CreateDurableConsumer(destination, name, null, false);
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector)
+        {
+            return CreateDurableConsumer(destination, name, selector, false);
         }
 
         public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
         {
             CheckClosed();
 
-            NmsMessageConsumer messageConsumer = new NmsDurableTopicSubscriber(GetNextConsumerId(), this, destination, name, selector, noLocal);
+            NmsMessageConsumer messageConsumer = new NmsDurableMessageConsumer(GetNextConsumerId(), this, destination, name, selector, noLocal);
             messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
 
             return messageConsumer;
         }
 
+        public IMessageConsumer CreateSharedConsumer(ITopic destination, string name)
+        {
+            return CreateSharedConsumer(destination, name, null);
+        }
+
+        public IMessageConsumer CreateSharedConsumer(ITopic destination, string name, string selector)
+        {
+            CheckClosed();
+
+            NmsMessageConsumer messageConsumer = new NmsSharedMessageConsumer(GetNextConsumerId(), this, destination, name, selector, false);
+            messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
+            
+            return messageConsumer;
+        }
+
+        public IMessageConsumer CreateSharedDurableConsumer(ITopic destination, string name)
+        {
+            return CreateSharedDurableConsumer(destination, name, null);
+        }
+
+        public IMessageConsumer CreateSharedDurableConsumer(ITopic destination, string name, string selector)
+        {
+            CheckClosed();
+
+            NmsMessageConsumer messageConsumer = new NmsSharedDurableMessageConsumer(GetNextConsumerId(), this, destination, name, selector, false);
+            messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
+            
+            return messageConsumer;
+        }
+
+        public NmsConsumerId GetNextConsumerId()
+        {
+            return new NmsConsumerId(SessionInfo.Id, consumerIdGenerator.IncrementAndGet());
+        }
+
         public void DeleteDurableConsumer(string name)
         {
+            Unsubscribe(name);
+        }
+
+        public void Unsubscribe(string name)
+        {
             CheckClosed();
 
             Connection.Unsubscribe(name);
@@ -168,12 +213,14 @@
 
         public IQueueBrowser CreateBrowser(IQueue queue)
         {
-            throw new NotImplementedException();
+            return CreateBrowser(queue, null);
         }
 
         public IQueueBrowser CreateBrowser(IQueue queue, string selector)
         {
-            throw new NotImplementedException();
+            CheckClosed();
+
+            return new NmsQueueBrowser(this, queue, selector);
         }
 
         public IQueue GetQueue(string name)
@@ -288,6 +335,13 @@
                 Start();
         }
 
+        public void Acknowledge()
+        {
+            if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) {
+                Acknowledge(AckType.ACCEPTED);
+            }
+        }
+
         public void Commit()
         {
             CheckClosed();
@@ -396,8 +450,20 @@
             Connection.Acknowledge(envelope, ackType).ConfigureAwait(false).GetAwaiter().GetResult();
         }
 
-        public void Send(NmsMessageProducer producer, IDestination destination, IMessage original, MsgDeliveryMode deliveryMode,
-            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp)
+        public void Send(NmsMessageProducer producer, IDestination destination, IMessage original,
+            MsgDeliveryMode deliveryMode,
+            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay, CompletionListener completionListener)
+        {
+            Task task = SendAsync(producer, destination, original, deliveryMode, priority, timeToLive, disableMessageId,
+                disableMessageTimestamp, deliveryDelay, completionListener);
+            if (completionListener == null)
+            {
+                task.ConfigureAwait(false).GetAwaiter().GetResult();
+            }
+        }
+
+        public Task SendAsync(NmsMessageProducer producer, IDestination destination, IMessage original, MsgDeliveryMode deliveryMode,
+            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay, CompletionListener completionListener)
         {
             if (destination == null)
                 throw new InvalidDestinationException("Destination must not be null");
@@ -416,6 +482,7 @@
             DateTime timeStamp = DateTime.UtcNow;
 
             bool hasTTL = timeToLive > TimeSpan.Zero;
+            bool hasDelay = deliveryDelay > TimeSpan.Zero;
 
             if (!disableMessageTimestamp)
             {
@@ -446,6 +513,11 @@
                 original.NMSMessageId = outbound.NMSMessageId;
             }
 
+            if (hasDelay)
+            {
+                outbound.Facade.DeliveryTime = timeStamp + deliveryDelay;
+            }
+
             if (hasTTL)
                 outbound.Facade.Expiration = timeStamp + timeToLive;
             else
@@ -455,13 +527,32 @@
 
             bool sync = deliveryMode == MsgDeliveryMode.Persistent;
 
-            TransactionContext.Send(new OutboundMessageDispatch
+            Task task = TransactionContext.Send(new OutboundMessageDispatch
             {
                 Message = outbound,
                 ProducerId = producer.Info.Id,
                 ProducerInfo = producer.Info,
                 SendAsync = !sync
-            }).ConfigureAwait(false).GetAwaiter().GetResult();
+            });
+
+            if (completionListener == null)
+            {
+                return task;
+            }
+            else
+            {
+                return task.ContinueWith(t =>
+                {
+                    if (t.IsFaulted)
+                        completionListener.Invoke(original, t.Exception.InnerException);
+                    else if (t.IsCanceled)
+                        completionListener.Invoke(original, new TaskCanceledException());
+                    else
+                        completionListener.Invoke(original, null);
+
+                });
+            }
+
         }
 
         internal void EnqueueForDispatch(NmsMessageConsumer.MessageDeliveryTask task)
diff --git a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs b/src/NMS.AMQP/NmsSharedDurableMessageConsumer.cs
similarity index 62%
copy from src/NMS.AMQP/NmsDurableTopicSubscriber.cs
copy to src/NMS.AMQP/NmsSharedDurableMessageConsumer.cs
index bc446bb..4a838bd 100644
--- a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
+++ b/src/NMS.AMQP/NmsSharedDurableMessageConsumer.cs
@@ -19,16 +19,19 @@
 
 namespace Apache.NMS.AMQP
 {
-    public class NmsDurableTopicSubscriber : NmsMessageConsumer
+    public class NmsSharedDurableMessageConsumer : NmsMessageConsumer
     {
-        public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+        public NmsSharedDurableMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
         {
         }
 
-        public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+        public NmsSharedDurableMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
         {
         }
 
         protected override bool IsDurableSubscription => true;
+        
+        protected override bool IsSharedSubscription => true;
+
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsSharedMessageConsumer.cs b/src/NMS.AMQP/NmsSharedMessageConsumer.cs
new file mode 100644
index 0000000..392a6a7
--- /dev/null
+++ b/src/NMS.AMQP/NmsSharedMessageConsumer.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.AMQP.Meta;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsSharedMessageConsumer : NmsMessageConsumer
+    {
+        public NmsSharedMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+        {
+        }
+
+        public NmsSharedMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+        {
+        }
+
+        protected override bool IsDurableSubscription => false;
+        
+        protected override bool IsSharedSubscription => true;
+
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 4692551..a90e1a7 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -114,7 +114,8 @@
             {
                 SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
                 SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
-                SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY
+                SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY,
+                SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS
             };
         }
 
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 7ecf7e7..7eb5d1c 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -56,6 +56,7 @@
         }
 
         public NmsConsumerId ConsumerId => this.info.Id;
+        
 
         public Task Attach()
         {
@@ -148,7 +149,25 @@
                 source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
             }
 
-            source.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(info.Destination) };
+            
+            IList<Symbol> capabilities = new List<Symbol>();
+            Symbol typeCapability = SymbolUtil.GetTerminusCapabilitiesForDestination(info.Destination);
+            if (typeCapability != null)
+            {
+                capabilities.Add(typeCapability);
+            }
+            
+            if (info.IsShared) {
+                capabilities.Add(SymbolUtil.SHARED);
+
+                if(!info.IsExplicitClientId) {
+                    capabilities.Add(SymbolUtil.GLOBAL);
+                }
+            }
+
+            if (capabilities.Any()) {
+                source.Capabilities = capabilities.ToArray();
+            }
 
             Map filters = new Map();
             
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
index 4185271..e61ddfb 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
@@ -163,7 +163,7 @@
             Message.BodySection = EMPTY_DATA;
         }
 
-        public virtual bool HasBody()
+        public override bool HasBody()
         {
             if (byteOut != null)
                 return byteOut.BaseStream.Length > 0;
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
index 7a588de..ae09084 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
@@ -67,7 +67,7 @@
             }
         }
 
-        public virtual bool HasBody()
+        public override bool HasBody()
         {
             return Map.Count > 0;
         }
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
index a6f53a7..d5332f1 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Runtime.CompilerServices;
 using System.Text;
 using Amqp;
 using Amqp.Framing;
@@ -37,6 +38,7 @@
         private IDestination consumerDestination;
         private IAmqpConnection connection;
         private DateTime? syntheticExpiration;
+        private DateTime syntheticDeliveryTime;
         public global::Amqp.Message Message { get; private set; }
 
         public int RedeliveryCount
@@ -254,6 +256,31 @@
             }
         }
 
+        public DateTime DeliveryTime
+        {
+            get
+            {
+                object deliveryTime = GetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME);
+                switch (deliveryTime)
+                {
+                    case DateTime time:
+                        return time;
+                    case long _:
+                    case ulong _:
+                    case int _:
+                    case uint _:
+                        return new DateTime(621355968000000000L + (long) deliveryTime * 10000L, DateTimeKind.Utc);
+                    default:
+                        return syntheticDeliveryTime;
+                }
+            }
+            set
+            {
+                syntheticDeliveryTime = value;
+                RemoveMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME);
+            }
+        }
+
         public Header Header => Message.Header;
 
         public string GroupId
@@ -395,6 +422,12 @@
             {
                 syntheticExpiration = DateTime.UtcNow + ttl;
             }
+
+            if (GetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME) == null)
+            {
+                syntheticDeliveryTime = DateTime.UtcNow;
+            }
+            
         }
 
         protected virtual void InitializeBody()
@@ -445,6 +478,11 @@
             return copy;
         }
 
+        public virtual bool HasBody()
+        {
+            return false;
+        }
+
         protected void CopyInto(AmqpNmsMessageFacade target)
         {
             target.connection = connection;
@@ -475,6 +513,13 @@
             LazyCreateMessageAnnotations();
             MessageAnnotations.Map.Add(symbolKeyName, value);
         }
+        
+        
+        public void RemoveMessageAnnotation(Symbol symbolKeyName)
+        {
+            if (Message.MessageAnnotations == null) return;
+            MessageAnnotations.Map.Remove(symbolKeyName);
+        }
 
         private void LazyCreateMessageAnnotations()
         {
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsObjectMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsObjectMessageFacade.cs
index 3e64d71..d8454fa 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsObjectMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsObjectMessageFacade.cs
@@ -29,7 +29,7 @@
 
         public IAmqpObjectTypeDelegate Delegate => typeDelegate;
 
-        public object Body
+        public object Object
         {
             get => Delegate.Object;
             set => Delegate.Object = value;
@@ -47,7 +47,7 @@
         {
             try
             {
-                Body = null;
+                Object = null;
             }
             catch (IOException e)
             {
@@ -91,5 +91,10 @@
             copy.typeDelegate = typeDelegate;
             return copy;
         }
+
+        public override bool HasBody()
+        {
+            return Object != null;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
index a7f63c9..18fc051 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
@@ -138,7 +138,7 @@
             return emptyList;
         }
 
-        public virtual bool HasBody() => !IsEmpty;
+        public override bool HasBody() => !IsEmpty;
 
         public override void ClearBody()
         {
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
index 371efcd..dd3cf81 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
@@ -71,7 +71,7 @@
             SetTextBody(null);
         }
 
-        public virtual bool HasBody()
+        public override bool HasBody()
         {
             try
             {
diff --git a/src/NMS.AMQP/Util/AtomicLong.cs b/src/NMS.AMQP/Util/AtomicLong.cs
index d43fae2..207179b 100644
--- a/src/NMS.AMQP/Util/AtomicLong.cs
+++ b/src/NMS.AMQP/Util/AtomicLong.cs
@@ -32,6 +32,16 @@
         {
             return Interlocked.Increment(ref value);
         }
+        
+        public long DecrementAndGet()
+        {
+            return Interlocked.Decrement(ref value);
+        }
+        
+        public long Get()
+        {
+            return Interlocked.Read(ref value);
+        }
 
         public static implicit operator long(AtomicLong atomicLong)
         {
diff --git a/src/NMS.AMQP/Util/SymbolUtil.cs b/src/NMS.AMQP/Util/SymbolUtil.cs
index 807bc7f..bb64387 100644
--- a/src/NMS.AMQP/Util/SymbolUtil.cs
+++ b/src/NMS.AMQP/Util/SymbolUtil.cs
@@ -39,6 +39,7 @@
         public static readonly Symbol OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER = new Symbol("sole-connection-for-container");
         public static readonly Symbol OPEN_CAPABILITY_ANONYMOUS_RELAY = new Symbol("ANONYMOUS-RELAY");
         public static readonly Symbol OPEN_CAPABILITY_DELAYED_DELIVERY = new Symbol("DELAYED_DELIVERY");
+        public static readonly Symbol OPEN_CAPABILITY_SHARED_SUBS = new Symbol("SHARED_SUBS");
 
         // Attach Frame 
         public readonly static Symbol ATTACH_EXPIRY_POLICY_LINK_DETACH = new Symbol("link-detach");
@@ -62,6 +63,8 @@
         public static readonly Symbol JMSX_OPT_DEST = new Symbol("x-opt-jms-dest");
         public static readonly Symbol JMSX_OPT_REPLY_TO = new Symbol("x-opt-jms-reply-to");
 
+        public static readonly Symbol NMS_DELIVERY_TIME = new Symbol("x-opt-delivery-time");
+
         // Frame Property Value
         public readonly static Symbol BOOLEAN_TRUE = new Symbol("true");
         public readonly static Symbol BOOLEAN_FALSE = new Symbol("false");
@@ -122,6 +125,21 @@
             // unknown destination type...
             return null;
         }
+        
+        public static bool IsNumber(object value)
+        {
+            return value is sbyte
+                   || value is byte
+                   || value is short
+                   || value is ushort
+                   || value is int
+                   || value is uint
+                   || value is long
+                   || value is ulong
+                   || value is float
+                   || value is double
+                   || value is decimal;
+        }
 
     }
 }
diff --git a/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs b/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
index 94b8723..40b2453 100644
--- a/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
+++ b/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
@@ -95,9 +95,9 @@
             }
         }
 
-        protected override object GetObjectProperty(string key) => properties[key];
+        public override object GetObject(string key) => properties[key];
 
-        protected override void SetObjectProperty(string key, object value)
+        public override void SetObject(string key, object value)
         {
             object objval = value;
 
diff --git a/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPValueMap.cs b/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPValueMap.cs
index c527e1f..8826461 100644
--- a/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPValueMap.cs
+++ b/src/NMS.AMQP/Util/Types/Map/AMQP/AMQPValueMap.cs
@@ -102,7 +102,7 @@
         /// </summary>
         /// <param name="key">Key to associated value.</param>
         /// <returns>Value for given Key.</returns>
-        protected override object GetObjectProperty(string key)
+        public override object GetObject(string key)
         {
             return this.value[key];
         }
@@ -112,7 +112,7 @@
         /// </summary>
         /// <param name="key">Key to associated value.</param>
         /// <param name="value">Value to set.</param>
-        protected override void SetObjectProperty(string key, object value)
+        public override void SetObject(string key, object value)
         {
             object objval = value;
             if(objval is IDictionary)
diff --git a/src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs b/src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs
index bc1943e..d51f5fb 100644
--- a/src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs
+++ b/src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs
@@ -52,25 +52,25 @@
 
         public object this[string key]
         {
-            get { return GetObjectProperty(key); }
+            get { return GetObject(key); }
 
             set
             {
                 CheckValidType(value);
-                SetObjectProperty(key, value);
+                SetObject(key, value);
             }
         }
 
         public bool GetBool(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(bool));
             return (bool) value;
         }
 
         public byte GetByte(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(byte));
             return (byte) value;
         }
@@ -79,7 +79,7 @@
 
         public char GetChar(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(char));
             return (char) value;
         }
@@ -88,21 +88,21 @@
 
         public double GetDouble(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(double));
             return (double) value;
         }
 
         public float GetFloat(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(float));
             return (float) value;
         }
 
         public int GetInt(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(int));
             return (int) value;
         }
@@ -111,7 +111,7 @@
 
         private T GetComplexType<T>(string key) where T : class
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             if (value is null)
                 return null;
             if (value is T complexValue)
@@ -122,90 +122,90 @@
 
         public long GetLong(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(long));
             return (long) value;
         }
 
         public short GetShort(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(short));
             return (short) value;
         }
 
         public string GetString(string key)
         {
-            object value = GetObjectProperty(key);
+            object value = GetObject(key);
             CheckValueType(value, typeof(string));
             return (string) value;
         }
 
         public void SetBool(string key, bool value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetByte(string key, byte value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetBytes(string key, byte[] value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetBytes(string key, byte[] value, int offset, int length)
         {
             byte[] copy = new byte[length];
             Array.Copy(value, offset, copy, 0, length);
-            SetObjectProperty(key, copy);
+            SetObject(key, copy);
         }
 
         public void SetChar(string key, char value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetDictionary(string key, IDictionary dictionary)
         {
-            SetObjectProperty(key, dictionary);
+            SetObject(key, dictionary);
         }
 
         public void SetDouble(string key, double value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetFloat(string key, float value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetInt(string key, int value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetList(string key, IList list)
         {
-            SetObjectProperty(key, list);
+            SetObject(key, list);
         }
 
         public void SetLong(string key, long value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetShort(string key, short value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         public void SetString(string key, string value)
         {
-            SetObjectProperty(key, value);
+            SetObject(key, value);
         }
 
         #endregion
@@ -213,8 +213,8 @@
         #region Protected Abstract Methods
 
         internal abstract object SyncRoot { get; }
-        protected abstract object GetObjectProperty(string key);
-        protected abstract void SetObjectProperty(string key, object value);
+        public abstract object GetObject(string key);
+        public abstract void SetObject(string key, object value);
 
         #endregion
 
@@ -262,7 +262,7 @@
                     }
 
                     first = false;
-                    object value = GetObjectProperty(key);
+                    object value = GetObject(key);
                     result = key + "=" + value;
                 }
             }
diff --git a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
index 69ff292..89b4874 100644
--- a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
@@ -60,6 +60,7 @@
         public IDestination NMSReplyTo { get; set; }
         public DateTime NMSTimestamp { get; set; }
         public string NMSType { get; set; }
+        public DateTime DeliveryTime { get; set; }
         public string GroupId { get; set; }
         public uint GroupSequence { get; set; }
         public DateTime? Expiration { get; set; }
@@ -71,5 +72,10 @@
         {
             return null;
         }
+
+        public bool HasBody()
+        {
+            return true;
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestObjectMessageFacade.cs b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestObjectMessageFacade.cs
index 2ef3bfb..29a6f18 100644
--- a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestObjectMessageFacade.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestObjectMessageFacade.cs
@@ -21,11 +21,11 @@
 {
     public class NmsTestObjectMessageFacade : NmsTestMessageFacade, INmsObjectMessageFacade
     {
-        public object Body { get; set; }
+        public object Object { get; set; }
 
         public override void ClearBody()
         {
-            Body = null;
+            Object = null;
         }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Message/Foreign/ForeignNmsMessage.cs b/test/Apache-NMS-AMQP-Test/Message/Foreign/ForeignNmsMessage.cs
index c31eb4d..e9f0c38 100644
--- a/test/Apache-NMS-AMQP-Test/Message/Foreign/ForeignNmsMessage.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/Foreign/ForeignNmsMessage.cs
@@ -41,6 +41,16 @@
             message.ClearProperties();
         }
 
+        public T Body<T>()
+        {
+            return message.Body<T>();
+        }
+
+        public bool IsBodyAssignableTo(Type type)
+        {
+            return message.IsBodyAssignableTo(type);
+        }
+
         public IPrimitiveMap Properties => message.Properties;
 
         public string NMSCorrelationID
@@ -102,5 +112,11 @@
             get => message.NMSType;
             set => message.NMSType = value;
         }
+
+        public DateTime NMSDeliveryTime
+        {
+            get => message.NMSDeliveryTime;
+            set => message.NMSDeliveryTime = value;
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Message/NmsObjectMessageTest.cs b/test/Apache-NMS-AMQP-Test/Message/NmsObjectMessageTest.cs
index b2b5f65..fdd32a9 100644
--- a/test/Apache-NMS-AMQP-Test/Message/NmsObjectMessageTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/NmsObjectMessageTest.cs
@@ -39,7 +39,7 @@
         {
             string content = "myStringContent";
             NmsTestObjectMessageFacade facade = new NmsTestObjectMessageFacade();
-            facade.Body = content;
+            facade.Object = content;
             NmsObjectMessage objectMessage = new NmsObjectMessage(facade);
             objectMessage.OnDispatch();
 
@@ -51,7 +51,7 @@
         {
             string content = "myStringContent";
             NmsTestObjectMessageFacade facade = new NmsTestObjectMessageFacade();
-            facade.Body = content;
+            facade.Object = content;
             NmsObjectMessage objectMessage = new NmsObjectMessage(facade);
             objectMessage.OnDispatch();
 
@@ -65,14 +65,14 @@
         {
             string content = "myStringContent";
             NmsTestObjectMessageFacade facade = new NmsTestObjectMessageFacade();
-            facade.Body = content;
+            facade.Object = content;
             NmsObjectMessage objectMessage = new NmsObjectMessage(facade);
             objectMessage.OnDispatch();
 
             Assert.NotNull(objectMessage.Body);
             objectMessage.ClearBody();
 
-            Assert.Null(facade.Body);
+            Assert.Null(facade.Object);
         }
 
         [Test]
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpMessageFactoryTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpMessageFactoryTest.cs
index 562048f..8ac51f4 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpMessageFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpMessageFactoryTest.cs
@@ -123,7 +123,7 @@
             Assert.IsInstanceOf<AmqpNmsObjectMessageFacade>(facade);
             Assert.IsNull(facade.JmsMsgType);
 
-            Assert.IsNull(((AmqpNmsObjectMessageFacade) facade).Body);
+            Assert.IsNull(((AmqpNmsObjectMessageFacade) facade).Object);
         }
 
 
@@ -141,8 +141,8 @@
 
             AmqpNmsObjectMessageFacade objectMessageFacade = (AmqpNmsObjectMessageFacade) facade;
 
-            Assert.IsNotNull(objectMessageFacade.Body);
-            Assert.IsInstanceOf<SerializableClass>(objectMessageFacade.Body);
+            Assert.IsNotNull(objectMessageFacade.Object);
+            Assert.IsInstanceOf<SerializableClass>(objectMessageFacade.Object);
         }
 
         [Test]
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsObjectMessageFacadeTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsObjectMessageFacadeTest.cs
index 29a077f..2a9b224 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsObjectMessageFacadeTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsObjectMessageFacadeTest.cs
@@ -57,7 +57,7 @@
         private void DoNewMessageToSendReturnsNullObjectTestImpl(bool amqpTyped)
         {
             AmqpNmsObjectMessageFacade amqpObjectMessageFacade = CreateNewObjectMessageFacade(amqpTyped);
-            Assert.Null(amqpObjectMessageFacade.Body);
+            Assert.Null(amqpObjectMessageFacade.Object);
         }
 
         [Test]
@@ -96,7 +96,7 @@
             String content = "myStringContent";
 
             AmqpNmsObjectMessageFacade amqpObjectMessageFacade = CreateNewObjectMessageFacade();
-            amqpObjectMessageFacade.Body = content;
+            amqpObjectMessageFacade.Object = content;
 
             var bytes = GetSerializedBytes(content);
 
@@ -118,7 +118,7 @@
             String content = "myStringContent";
 
             AmqpNmsObjectMessageFacade amqpObjectMessageFacade = CreateNewObjectMessageFacade(true);
-            amqpObjectMessageFacade.Body = content;
+            amqpObjectMessageFacade.Object = content;
 
             // retrieve the body from the underlying message, check it matches expectation
             RestrictedDescribed section = amqpObjectMessageFacade.Message.BodySection;
@@ -149,9 +149,9 @@
             AmqpNmsObjectMessageFacade facade = CreateReceivedObjectMessageFacade(message);
 
             Assert.NotNull(facade.Message.BodySection, "Expected existing body section to be found");
-            facade.Body = null;
+            facade.Object = null;
             Assert.AreSame(AmqpSerializedObjectDelegate.NULL_OBJECT_BODY, facade.Message.BodySection, "Expected existing body section to be replaced");
-            Assert.Null(facade.Body);
+            Assert.Null(facade.Object);
         }
 
         /*
@@ -179,7 +179,7 @@
             facade.ClearBody();
 
             Assert.AreSame(AmqpSerializedObjectDelegate.NULL_OBJECT_BODY, facade.Message.BodySection, "Expected existing body section to be replaced");
-            Assert.Null(facade.Body);
+            Assert.Null(facade.Object);
         }
 
         /*
@@ -195,12 +195,12 @@
             };
 
             AmqpNmsObjectMessageFacade facade = CreateNewObjectMessageFacade(false);
-            facade.Body = origMap;
+            facade.Object = origMap;
 
             Dictionary<string, string> d = new Dictionary<string, string>();
 
             // verify we get a different-but-equal object back
-            object body = facade.Body;
+            object body = facade.Object;
             Assert.IsInstanceOf<Dictionary<string, string>>(body);
             Dictionary<string, string> returnedObject1 = (Dictionary<string, string>) body;
             Assert.AreNotSame(origMap, returnedObject1, "Expected different objects, due to snapshot being taken");
@@ -210,7 +210,7 @@
             origMap.Add("key2", "value2");
 
             // verify we get a different-but-equal object back when compared to the previously retrieved object
-            object body2 = facade.Body;
+            object body2 = facade.Object;
             Assert.IsInstanceOf<Dictionary<string, string>>(body2);
             Dictionary<string, string> returnedObject2 = (Dictionary<string, string>) body2;
             Assert.AreNotSame(origMap, returnedObject2, "Expected different objects, due to snapshot being taken");
@@ -246,7 +246,7 @@
 
             AmqpNmsObjectMessageFacade facade = CreateReceivedObjectMessageFacade(message);
 
-            Assert.Null(facade.Body, "Expected null object");
+            Assert.Null(facade.Object, "Expected null object");
         }
 
         [Test]
@@ -265,7 +265,7 @@
 
             Assert.Catch<IllegalStateException>(() =>
             {
-                object body = facade.Body;
+                object body = facade.Object;
             });
         }
 
@@ -289,7 +289,7 @@
             AmqpNmsObjectMessageFacade facade = CreateReceivedObjectMessageFacade(message);
 
             // verify we get a different-but-equal object back
-            object body = facade.Body;
+            object body = facade.Object;
             Assert.IsInstanceOf<Dictionary<string, string>>(body);
             Dictionary<string, string> returnedObject1 = (Dictionary<string, string>) body;
             Assert.AreNotSame(origMap, returnedObject1, "Expected different objects, due to snapshot being taken");
@@ -297,7 +297,7 @@
 
 
             // verify we get a different-but-equal object back when compared to the previously retrieved object
-            object body2 = facade.Body;
+            object body2 = facade.Object;
             Assert.IsInstanceOf<Dictionary<string, string>>(body2);
             Dictionary<string, string> returnedObject2 = (Dictionary<string, string>) body2;
             Assert.AreNotSame(origMap, returnedObject2, "Expected different objects, due to snapshot being taken");
@@ -323,7 +323,7 @@
             AmqpNmsObjectMessageFacade facade = CreateReceivedObjectMessageFacade(message);
 
             // verify we get a different-but-equal object back
-            object body = facade.Body;
+            object body = facade.Object;
             Assert.IsInstanceOf<Map>(body);
             Map returnedObject1 = (Map) body;
             Assert.AreNotSame(origMap, returnedObject1, "Expected different objects, due to snapshot being taken");
@@ -331,7 +331,7 @@
 
 
             // verify we get a different-but-equal object back when compared to the previously retrieved object
-            object body2 = facade.Body;
+            object body2 = facade.Object;
             Assert.IsInstanceOf<Map>(body2);
             Map returnedObject2 = (Map) body2;
             Assert.AreNotSame(origMap, returnedObject2, "Expected different objects, due to snapshot being taken");
@@ -351,11 +351,11 @@
             String content = "myStringContent";
 
             AmqpNmsObjectMessageFacade amqpObjectMessageFacade = CreateNewObjectMessageFacade();
-            amqpObjectMessageFacade.Body = content;
+            amqpObjectMessageFacade.Object = content;
 
             AmqpNmsObjectMessageFacade copy = amqpObjectMessageFacade.Copy() as AmqpNmsObjectMessageFacade;
             Assert.IsNotNull(copy);
-            Assert.AreEqual(amqpObjectMessageFacade.Body, copy.Body);
+            Assert.AreEqual(amqpObjectMessageFacade.Object, copy.Object);
         }
 
         private static byte[] GetSerializedBytes(object content)
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 0e8f74f..164f141 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -51,6 +51,7 @@
             SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
             SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
             SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY,
+            SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS
         };
 
         private const int CONNECTION_CHANNEL = 0;