AMQNET-637 Remove CompletionListener in favour of Task based Async, as well complete some left over bits on shared subscriptions and subscription tracking.
diff --git a/src/NMS.AMQP/Message/OutboundMessageDispatch.cs b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
index a4115c8..2aefdb1 100644
--- a/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
+++ b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
@@ -25,6 +25,6 @@
         public NmsProducerId ProducerId { get; set; }
         public NmsProducerInfo ProducerInfo { get; set; }
         public NmsMessage Message { get; set; }
-        public bool SendAsync { get; set; }
+        public bool FireAndForget { get; set; }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index 24f3c13..f57c5d4 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -64,6 +64,12 @@
         public int MaxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
         public int IdleTimeOut { get; set; } = DEFAULT_IDLE_TIMEOUT;
         
+        public bool AnonymousRelaySupported { get; set; }
+        
+        public bool DelayedDeliverySupported { get; set; }
+        
+        public bool SharedSubsSupported { get; set; }
+        
 
         public void SetClientId(string clientId, bool explicitClientId)
         {
diff --git a/src/NMS.AMQP/NmsContext.cs b/src/NMS.AMQP/NmsContext.cs
index 882f583..260647e 100644
--- a/src/NMS.AMQP/NmsContext.cs
+++ b/src/NMS.AMQP/NmsContext.cs
@@ -86,6 +86,7 @@
             return new NmsProducer(GetSession(), sharedProducer);
         }
 
+
         public INMSConsumer CreateConsumer(IDestination destination)
         {
             return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateConsumer(destination)));
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index 20a706c..6257177 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -85,73 +85,30 @@
 
         public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
         {
-            Send(destination, message, deliveryMode, priority, timeToLive, null);
-        }
-
-        public void Send(IMessage message, CompletionListener completionListener)
-        {
-            Send(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);        
-        }
-
-        public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive,
-            CompletionListener completionListener)
-        {
-            Send(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);        
-        }
-
-        public void Send(IDestination destination, IMessage message, CompletionListener completionListener)
-        {
-            Send(destination, message, deliveryMode, priority, timeToLive, completionListener);        
-        }
-
-        public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
-            TimeSpan timeToLive, CompletionListener completionListener)
-        {
             CheckClosed();
-            session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay, completionListener);
+            session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay);
         }
 
         public Task SendAsync(IMessage message)
         {
             return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive);
         }
-        
-        public Task SendAsync(IMessage message, CompletionListener completionListener)
-        {
-            return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);
-        }
 
         public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
         {
             return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive);
         }
-        
-        public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, CompletionListener completionListener)
-        {
-            return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);
-        }
 
         public Task SendAsync(IDestination destination, IMessage message)
         {
             return SendAsync(destination, message, deliveryMode, priority, timeToLive);
         }
-        
-        public Task SendAsync(IDestination destination, IMessage message, CompletionListener completionListener)
-        {
-            return SendAsync(destination, message, deliveryMode, priority, timeToLive, completionListener);
-        }
-        
+
         public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
             TimeSpan timeToLive)
         {
-            return SendAsync(destination, message, deliveryMode, priority, timeToLive, null);
-        }
-
-        public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
-            TimeSpan timeToLive, CompletionListener completionListener)
-        {
             CheckClosed();
-            return session.SendAsync(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay, null);
+            return session.SendAsync(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay);
         }
 
         public void Close()
diff --git a/src/NMS.AMQP/NmsProducer.cs b/src/NMS.AMQP/NmsProducer.cs
index bc6ed17..974a185 100644
--- a/src/NMS.AMQP/NmsProducer.cs
+++ b/src/NMS.AMQP/NmsProducer.cs
@@ -28,9 +28,7 @@
         
         private readonly ISession session;
         private readonly NmsMessageProducer producer;
-
-        private CompletionListener completionListener;
-
+        
         // Message Headers
         private String correlationId;
         private String type;
@@ -79,7 +77,7 @@
                 message.NMSReplyTo = replyTo;
             }
             
-            producer.Send(destination, message, completionListener);
+            producer.Send(destination, message);
             
             return this;
         }
@@ -106,12 +104,12 @@
             return Send(destination, CreateObjectMessage(body));
         }
 
-        public Task<INMSProducer> SendAsync(IDestination destination, IMessage message)
+        public async Task<INMSProducer> SendAsync(IDestination destination, IMessage message)
         {
             if (message == null) {
                 throw new MessageFormatException("Message must not be null");
             }
-            
+
             NmsMessageTransformation.CopyMap(messageProperties, message.Properties);
             
             if (correlationId != null) {
@@ -124,8 +122,8 @@
                 message.NMSReplyTo = replyTo;
             }
 
-            Task task = producer.SendAsync(destination, message, completionListener);
-            return task.ContinueWith(t => (INMSProducer) this);
+            await producer.SendAsync(destination, message);
+            return this;
         }
 
         public Task<INMSProducer> SendAsync(IDestination destination, string body)
@@ -149,18 +147,6 @@
         {
             return SendAsync(destination, CreateObjectMessage(body));
         }
-        
-        public CompletionListener CompletionListener
-        {
-            get => completionListener;
-            set => completionListener = value;
-        }
-
-        public INMSProducer SetCompletionListener(CompletionListener completionListener)
-        {
-            CompletionListener = completionListener;
-            return this;
-        }
 
         public INMSProducer ClearProperties()
         {
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index e90db35..e1bb898 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -452,18 +452,16 @@
 
         public void Send(NmsMessageProducer producer, IDestination destination, IMessage original,
             MsgDeliveryMode deliveryMode,
-            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay, CompletionListener completionListener)
+            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay)
         {
-            Task task = SendAsync(producer, destination, original, deliveryMode, priority, timeToLive, disableMessageId,
-                disableMessageTimestamp, deliveryDelay, completionListener);
-            if (completionListener == null)
-            {
-                task.ConfigureAwait(false).GetAwaiter().GetResult();
-            }
+
+            SendAsync(producer, destination, original, deliveryMode, priority, timeToLive, disableMessageId,
+                disableMessageTimestamp, deliveryDelay).ConfigureAwait(false).GetAwaiter().GetResult();
+            
         }
 
         public Task SendAsync(NmsMessageProducer producer, IDestination destination, IMessage original, MsgDeliveryMode deliveryMode,
-            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay, CompletionListener completionListener)
+            MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay)
         {
             if (destination == null)
                 throw new InvalidDestinationException("Destination must not be null");
@@ -525,34 +523,15 @@
 
             outbound.OnSend(timeToLive);
 
-            bool sync = deliveryMode == MsgDeliveryMode.Persistent;
+            bool fireAndForget = deliveryMode == MsgDeliveryMode.NonPersistent;
 
-            Task task = TransactionContext.Send(new OutboundMessageDispatch
+            return TransactionContext.Send(new OutboundMessageDispatch
             {
                 Message = outbound,
                 ProducerId = producer.Info.Id,
                 ProducerInfo = producer.Info,
-                SendAsync = !sync
+                FireAndForget = fireAndForget
             });
-
-            if (completionListener == null)
-            {
-                return task;
-            }
-            else
-            {
-                return task.ContinueWith(t =>
-                {
-                    if (t.IsFaulted)
-                        completionListener.Invoke(original, t.Exception.InnerException);
-                    else if (t.IsCanceled)
-                        completionListener.Invoke(original, new TaskCanceledException());
-                    else
-                        completionListener.Invoke(original, null);
-
-                });
-            }
-
         }
 
         internal void EnqueueForDispatch(NmsMessageConsumer.MessageDeliveryTask task)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index a90e1a7..1e86ced 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -21,6 +21,7 @@
 using System.Threading.Tasks;
 using Amqp;
 using Amqp.Framing;
+using Amqp.Types;
 using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Meta;
 using Apache.NMS.AMQP.Provider.Amqp.Message;
@@ -63,6 +64,8 @@
         public string TopicPrefix => Info.TopicPrefix;
         public bool ObjectMessageUsesAmqpTypes { get; set; } = false;
         public NmsConnectionInfo Info { get; }
+        
+        public AmqpSubscriptionTracker SubscriptionTracker { get; } = new AmqpSubscriptionTracker();
 
         public INmsMessageFactory MessageFactory => messageFactory;
 
@@ -128,6 +131,30 @@
             }
             else
             {
+                Symbol[] capabilities = open.OfferedCapabilities;
+                if (capabilities != null)
+                {
+                    if (Array.Exists(capabilities,
+                        symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY)))
+                    {
+                        Info.AnonymousRelaySupported = true;
+                    }
+
+                    if (Array.Exists(capabilities,
+                        symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY)))
+                    {
+                        Info.DelayedDeliverySupported = true;
+                    }
+
+                    if (Array.Exists(capabilities,
+                        symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY)))
+                    {
+                        Info.DelayedDeliverySupported = true;
+                    }
+                }
+
+
+
                 object value = SymbolUtil.GetFromFields(open.Properties, SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
                 if (value is string topicPrefix)
                 {
@@ -139,7 +166,6 @@
                 {
                     Info.QueuePrefix = queuePrefix;
                 }
-
                 this.tsc.TrySetResult(true);
                 Provider.FireConnectionEstablished();
             }
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 7eb5d1c..7670d2f 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -42,6 +42,9 @@
         private ReceiverLink receiverLink;
         private readonly LinkedList<InboundMessageDispatch> messages;
         private readonly object syncRoot = new object();
+        
+        private bool validateSharedSubsLinkCapability;
+        private bool sharedSubsNotSupported;
 
         private readonly AmqpSession session;
         public IDestination Destination => info.Destination;
@@ -70,38 +73,95 @@
                 RcvSettleMode = ReceiverSettleMode.First,
                 SndSettleMode = (info.IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
             };
-            string name;
-            if (info.IsDurable)
+
+            string receiverLinkName = null;
+
+            string subscriptionName = info.SubscriptionName;
+            if (!string.IsNullOrEmpty(subscriptionName))
             {
-                name = info.SubscriptionName;
+                AmqpConnection connection = session.Connection;
+
+                if (info.IsShared && !connection.Info.SharedSubsSupported) {
+                    validateSharedSubsLinkCapability = true;
+                }
+
+                AmqpSubscriptionTracker subTracker = connection.SubscriptionTracker;
+
+                // Validate subscriber type allowed given existing active subscriber types.
+                if (info.IsShared && info.IsDurable) {
+                    if(subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
+                        // Don't allow shared sub if there is already an active exclusive durable sub
+                        throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+                    }
+                } else if (!info.IsShared && info.IsDurable) {
+                    if (subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
+                        // Exclusive durable sub is already active
+                        throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+                    } else if (subTracker.IsActiveSharedDurableSub(subscriptionName)) {
+                        // Don't allow exclusive durable sub if there is already an active shared durable sub
+                        throw new NMSException("A shared durable subscription is already active with name '" + subscriptionName + "'");
+                    }
+                }
+
+                // Get the link name for the subscription. Throws if certain further validations fail.
+                receiverLinkName = subTracker.ReserveNextSubscriptionLinkName(subscriptionName, info);
             }
-            else
-            {
+
+            
+            if (receiverLinkName == null) {
                 string destinationAddress = source.Address ?? "";
-                name = "nms:receiver:" + info.Id
-                                       + (destinationAddress.Length == 0 ? "" : (":" + destinationAddress));
+                receiverLinkName = "nms:receiver:" + info.Id
+                                                   + (destinationAddress.Length == 0 ? "" : (":" + destinationAddress));
             }
 
             // TODO: Add timeout
             var tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
-            receiverLink = new ReceiverLink(session.UnderlyingSession, name, attach, HandleOpened(tsc));
+            receiverLink = new ReceiverLink(session.UnderlyingSession, receiverLinkName, attach, HandleOpened(tsc));
             receiverLink.AddClosedCallback(HandleClosed(tsc));
             return tsc.Task;
         }
 
         private OnAttached HandleOpened(TaskCompletionSource<bool> tsc) => (link, attach) =>
         {
+            if (validateSharedSubsLinkCapability)
+            {
+                Symbol[] remoteOfferedCapabilities = attach.OfferedCapabilities;
+
+                bool supported = false;
+                if (remoteOfferedCapabilities != null)
+                {
+                    if (Array.Exists(remoteOfferedCapabilities, symbol => symbol == SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS))
+                    {
+                        supported = true;
+                    }
+                }
+
+                if (!supported)
+                {
+                    sharedSubsNotSupported = true;
+
+                    if (info.IsDurable)
+                    {
+                        link.Detach(null);
+                    }
+                    else
+                    {
+                        link.Close();
+                    }
+                }
+            }
+
             if (IsClosePending(attach))
                 return;
 
             tsc.SetResult(true);
         };
 
-        private static bool IsClosePending(Attach attach)
+        private bool IsClosePending(Attach attach)
         {
             // When no link terminus was created, the peer will now detach/close us otherwise
             // we need to validate the returned remote source prior to open completion.
-            return attach.Source == null;
+            return sharedSubsNotSupported || attach.Source == null;
         }
 
         private ClosedCallback HandleClosed(TaskCompletionSource<bool> tsc) => (sender, error) =>
@@ -109,6 +169,7 @@
             NMSException exception = ExceptionSupport.GetException(error, "Received Amqp link detach with Error for link {0}", info.Id);
             if (!tsc.TrySetException(exception))
             {
+                session.Connection.SubscriptionTracker.ConsumerRemoved(info);
                 session.RemoveConsumer(info.Id);
 
                 // If session is not closed it means that the link was remotely detached 
@@ -143,6 +204,8 @@
                 source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
                 source.Durable = (int) TerminusDurability.NONE;
             }
+            
+            
 
             if (info.IsBrowser)
             {
@@ -363,7 +426,7 @@
 
         public bool HasSubscription(string subscriptionName)
         {
-            return info.IsDurable && info.SubscriptionName.Equals(subscriptionName);
+            return (info.IsDurable || info.IsShared) && info.SubscriptionName.Equals(subscriptionName);
         }
 
         public void PostRollback()
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index a6e8206..4cf72fa 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -136,13 +136,12 @@
 
                     var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
 
-                    if (envelope.SendAsync)
-                        return SendAsync(message, transactionalState);
-                    else
+                    if (envelope.FireAndForget)
                     {
                         SendSync(message, transactionalState);
                         return Task.CompletedTask;
                     }
+                    return SendAsync(message, transactionalState);
                 }
                 catch (TimeoutException tex)
                 {
@@ -160,6 +159,7 @@
                     throw ExceptionSupport.Wrap(ex);
                 }
             }
+            throw ExceptionSupport.GetException(this.senderLink, "unexpected enveloper");
         }
 
         private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
@@ -167,11 +167,24 @@
             senderLink.Send(message, deliveryState, null, null);
         }
         
-        private Task SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
+        private async Task SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
         {
             var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
-            senderLink.Send(message, deliveryState, _onOutcome, tcs);
-            return tcs.Task;
+            CancellationTokenSource cts = null;
+            if (session.Connection.Provider.SendTimeout != NmsConnectionInfo.INFINITE)
+            {
+                cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(session.Connection.Provider.SendTimeout));
+                cts.Token.Register(_ => tcs.TrySetCanceled(), null);
+            }
+            try
+            {
+                senderLink.Send(message, deliveryState, _onOutcome, tcs);
+                await tcs.Task.ConfigureAwait(false);
+            }
+            finally
+            {
+                cts?.Dispose();
+            }
         }
         
         private static void OnOutcome(ILink sender, global::Amqp.Message message, Outcome outcome, object state)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs
new file mode 100644
index 0000000..5679594
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs
@@ -0,0 +1,327 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.NMS.AMQP.Meta;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+    public class AmqpSubscriptionTracker
+    {
+
+        // Subscription Name Delimiter
+        public readonly static string SUB_NAME_DELIMITER = "|";
+
+        private readonly ISet<string> exclusiveDurableSubs = new HashSet<string>();
+
+        private readonly IDictionary<string, SubDetails> sharedDurableSubs =
+            new ConcurrentDictionary<string, SubDetails>();
+
+        private readonly IDictionary<string, SubDetails> sharedVolatileSubs =
+            new ConcurrentDictionary<string, SubDetails>();
+
+        public string ReserveNextSubscriptionLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+        {
+            ValidateSubscriptionName(subscriptionName);
+
+            if (consumerInfo == null)
+            {
+                throw new ArgumentException("Consumer info must not be null.");
+            }
+
+            if (consumerInfo.IsShared)
+            {
+                if (consumerInfo.IsDurable)
+                {
+                    return GetSharedDurableSubLinkName(subscriptionName, consumerInfo);
+                }
+                else
+                {
+                    return GetSharedVolatileSubLinkName(subscriptionName, consumerInfo);
+                }
+            }
+            else if (consumerInfo.IsDurable)
+            {
+                RegisterExclusiveDurableSub(subscriptionName);
+                return subscriptionName;
+            }
+            else
+            {
+                throw new IllegalStateException(
+                    "Non-shared non-durable sub link naming is not handled by the tracker.");
+            }
+        }
+
+        private void ValidateSubscriptionName(string subscriptionName)
+        {
+            if (string.IsNullOrEmpty(subscriptionName))
+            {
+                throw new ArgumentException("Subscription name must not be null or empty.");
+            }
+
+            if (subscriptionName.Contains(SUB_NAME_DELIMITER))
+            {
+                throw new ArgumentException(
+                    "Subscription name must not contain '" + SUB_NAME_DELIMITER + "' character.");
+            }
+        }
+
+        private string GetSharedDurableSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+        {
+            IDestination topic = consumerInfo.Destination;
+            string selector = consumerInfo.Selector;
+
+            SubDetails subDetails = null;
+            if (sharedDurableSubs.ContainsKey(subscriptionName))
+            {
+                subDetails = sharedDurableSubs[subscriptionName];
+
+                if (subDetails.Matches(topic, selector))
+                {
+                    subDetails.AddSubscriber(consumerInfo);
+                }
+                else
+                {
+                    throw new NMSException("Subscription details dont match existing subscriber.");
+                }
+            }
+            else
+            {
+                subDetails = new SubDetails(topic, selector, consumerInfo);
+            }
+
+            sharedDurableSubs.Add(subscriptionName, subDetails);
+
+            int count = subDetails.TotalSubscriberCount();
+
+            return GetDurableSubscriptionLinkName(subscriptionName, consumerInfo.IsExplicitClientId, count);
+        }
+
+        private string GetDurableSubscriptionLinkName(string subscriptionName, bool hasClientID, int count)
+        {
+            string linkName = GetFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
+            if (count > 1)
+            {
+                if (hasClientID)
+                {
+                    linkName += SUB_NAME_DELIMITER + count;
+                }
+                else
+                {
+                    linkName += count;
+                }
+            }
+
+            return linkName;
+        }
+
+        public string GetFirstDurableSubscriptionLinkName(string subscriptionName, bool hasClientID)
+        {
+            ValidateSubscriptionName(subscriptionName);
+
+            String receiverLinkName = subscriptionName;
+            if (!hasClientID)
+            {
+                receiverLinkName += SUB_NAME_DELIMITER + "global";
+            }
+
+            return receiverLinkName;
+        }
+
+        private String GetSharedVolatileSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+        {
+            IDestination topic = consumerInfo.Destination;
+            string selector = consumerInfo.Selector;
+
+            SubDetails subDetails = null;
+            if (sharedVolatileSubs.ContainsKey(subscriptionName))
+            {
+                subDetails = sharedVolatileSubs[subscriptionName];
+
+                if (subDetails.Matches(topic, selector))
+                {
+                    subDetails.AddSubscriber(consumerInfo);
+                }
+                else
+                {
+                    throw new NMSException("Subscription details dont match existing subscriber");
+                }
+            }
+            else
+            {
+                subDetails = new SubDetails(topic, selector, consumerInfo);
+            }
+
+            sharedVolatileSubs.Add(subscriptionName, subDetails);
+
+            string receiverLinkName = subscriptionName + SUB_NAME_DELIMITER;
+            int count = subDetails.TotalSubscriberCount();
+
+            if (consumerInfo.IsExplicitClientId)
+            {
+                receiverLinkName += "volatile" + count;
+            }
+            else
+            {
+                receiverLinkName += "global-volatile" + count;
+            }
+
+            return receiverLinkName;
+        }
+
+        private void RegisterExclusiveDurableSub(String subscriptionName)
+        {
+            exclusiveDurableSubs.Add(subscriptionName);
+        }
+
+        /**
+         * Checks if there is an exclusive durable subscription already
+         * recorded as active with the given subscription name.
+         *
+         * @param subscriptionName name of subscription to check
+         * @return true if there is an exclusive durable sub with this name already active
+         */
+        public bool IsActiveExclusiveDurableSub(String subscriptionName)
+        {
+            return exclusiveDurableSubs.Contains(subscriptionName);
+        }
+
+        /**
+         * Checks if there is a shared durable subscription already
+         * recorded as active with the given subscription name.
+         *
+         * @param subscriptionName name of subscription to check
+         * @return true if there is a shared durable sub with this name already active
+         */
+        public bool IsActiveSharedDurableSub(string subscriptionName)
+        {
+            return sharedDurableSubs.ContainsKey(subscriptionName);
+        }
+
+        /**
+         * Checks if there is either a shared or exclusive durable subscription
+         * already recorded as active with the given subscription name.
+         *
+         * @param subscriptionName name of subscription to check
+         * @return true if there is a durable sub with this name already active
+         */
+        public bool IsActiveDurableSub(string subscriptionName)
+        {
+            return IsActiveExclusiveDurableSub(subscriptionName) || IsActiveSharedDurableSub(subscriptionName);
+        }
+
+        /**
+         * Checks if there is an shared volatile subscription already
+         * recorded as active with the given subscription name.
+         *
+         * @param subscriptionName name of subscription to check
+         * @return true if there is a shared volatile sub with this name already active
+         */
+        public bool IsActiveSharedVolatileSub(String subscriptionName)
+        {
+            return sharedVolatileSubs.ContainsKey(subscriptionName);
+        }
+
+        public void ConsumerRemoved(NmsConsumerInfo consumerInfo)
+        {
+            string subscriptionName = consumerInfo.SubscriptionName;
+
+            if (!string.IsNullOrEmpty(subscriptionName))
+            {
+                if (consumerInfo.IsShared)
+                {
+                    if (consumerInfo.IsDurable)
+                    {
+                        if (sharedDurableSubs.ContainsKey(subscriptionName))
+                        {
+                            SubDetails subDetails = sharedDurableSubs[subscriptionName];
+                            subDetails.RemoveSubscriber(consumerInfo);
+
+                            int count = subDetails.ActiveSubscribers();
+                            if (count < 1)
+                            {
+                                sharedDurableSubs.Remove(subscriptionName);
+                            }
+                        }
+                    }
+                    else
+                    {
+                        if (sharedVolatileSubs.ContainsKey(subscriptionName))
+                        {
+                            SubDetails subDetails = sharedVolatileSubs[subscriptionName];
+                            subDetails.RemoveSubscriber(consumerInfo);
+
+                            int count = subDetails.ActiveSubscribers();
+                            if (count < 1)
+                            {
+                                sharedVolatileSubs.Remove(subscriptionName);
+                            }
+                        }
+                    }
+                }
+                else if (consumerInfo.IsDurable)
+                {
+                    exclusiveDurableSubs.Remove(subscriptionName);
+                }
+            }
+        }
+
+        private class SubDetails
+        {
+            private IDestination topic = null;
+            private String selector = null;
+            private ISet<NmsConsumerInfo> subscribers = new HashSet<NmsConsumerInfo>();
+            private int totalSubscriberCount;
+
+            public SubDetails(IDestination topic, string selector, NmsConsumerInfo info)
+            {
+                this.topic = topic ?? throw new ArgumentException("Topic destination must not be null");
+                this.selector = selector;
+                AddSubscriber(info);
+            }
+
+            public void AddSubscriber(NmsConsumerInfo info)
+            {
+                if (info == null)
+                {
+                    throw new ArgumentException("Consumer info must not be null");
+                }
+
+                totalSubscriberCount++;
+                subscribers.Add(info);
+            }
+
+            public void RemoveSubscriber(NmsConsumerInfo info)
+            {
+                subscribers.Remove(info);
+            }
+
+            public int ActiveSubscribers()
+            {
+                return subscribers.Count;
+            }
+
+            public int TotalSubscriberCount()
+            {
+                return totalSubscriberCount;
+            }
+
+            public bool Matches(IDestination newTopic, string newSelector)
+            {
+                if (!topic.Equals(newTopic))
+                {
+                    return false;
+                }
+
+                if (selector == null)
+                {
+                    return newSelector == null;
+                }
+                else
+                {
+                    return selector.Equals(newSelector);
+                }
+            }
+
+        }
+    }
+}
\ No newline at end of file