AMQNET-637 Remove CompletionListener in favour of Task based Async, as well complete some left over bits on shared subscriptions and subscription tracking.
diff --git a/src/NMS.AMQP/Message/OutboundMessageDispatch.cs b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
index a4115c8..2aefdb1 100644
--- a/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
+++ b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
@@ -25,6 +25,6 @@
public NmsProducerId ProducerId { get; set; }
public NmsProducerInfo ProducerInfo { get; set; }
public NmsMessage Message { get; set; }
- public bool SendAsync { get; set; }
+ public bool FireAndForget { get; set; }
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index 24f3c13..f57c5d4 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -64,6 +64,12 @@
public int MaxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
public int IdleTimeOut { get; set; } = DEFAULT_IDLE_TIMEOUT;
+ public bool AnonymousRelaySupported { get; set; }
+
+ public bool DelayedDeliverySupported { get; set; }
+
+ public bool SharedSubsSupported { get; set; }
+
public void SetClientId(string clientId, bool explicitClientId)
{
diff --git a/src/NMS.AMQP/NmsContext.cs b/src/NMS.AMQP/NmsContext.cs
index 882f583..260647e 100644
--- a/src/NMS.AMQP/NmsContext.cs
+++ b/src/NMS.AMQP/NmsContext.cs
@@ -86,6 +86,7 @@
return new NmsProducer(GetSession(), sharedProducer);
}
+
public INMSConsumer CreateConsumer(IDestination destination)
{
return StartIfNeeded(new NmsConsumer(GetSession(), (NmsMessageConsumer) GetSession().CreateConsumer(destination)));
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index 20a706c..6257177 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -85,73 +85,30 @@
public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
- Send(destination, message, deliveryMode, priority, timeToLive, null);
- }
-
- public void Send(IMessage message, CompletionListener completionListener)
- {
- Send(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);
- }
-
- public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive,
- CompletionListener completionListener)
- {
- Send(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);
- }
-
- public void Send(IDestination destination, IMessage message, CompletionListener completionListener)
- {
- Send(destination, message, deliveryMode, priority, timeToLive, completionListener);
- }
-
- public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
- TimeSpan timeToLive, CompletionListener completionListener)
- {
CheckClosed();
- session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay, completionListener);
+ session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay);
}
public Task SendAsync(IMessage message)
{
return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive);
}
-
- public Task SendAsync(IMessage message, CompletionListener completionListener)
- {
- return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);
- }
public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive);
}
-
- public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, CompletionListener completionListener)
- {
- return SendAsync(Info.Destination, message, deliveryMode, priority, timeToLive, completionListener);
- }
public Task SendAsync(IDestination destination, IMessage message)
{
return SendAsync(destination, message, deliveryMode, priority, timeToLive);
}
-
- public Task SendAsync(IDestination destination, IMessage message, CompletionListener completionListener)
- {
- return SendAsync(destination, message, deliveryMode, priority, timeToLive, completionListener);
- }
-
+
public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
TimeSpan timeToLive)
{
- return SendAsync(destination, message, deliveryMode, priority, timeToLive, null);
- }
-
- public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
- TimeSpan timeToLive, CompletionListener completionListener)
- {
CheckClosed();
- return session.SendAsync(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay, null);
+ return session.SendAsync(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp, deliveryDelay);
}
public void Close()
diff --git a/src/NMS.AMQP/NmsProducer.cs b/src/NMS.AMQP/NmsProducer.cs
index bc6ed17..974a185 100644
--- a/src/NMS.AMQP/NmsProducer.cs
+++ b/src/NMS.AMQP/NmsProducer.cs
@@ -28,9 +28,7 @@
private readonly ISession session;
private readonly NmsMessageProducer producer;
-
- private CompletionListener completionListener;
-
+
// Message Headers
private String correlationId;
private String type;
@@ -79,7 +77,7 @@
message.NMSReplyTo = replyTo;
}
- producer.Send(destination, message, completionListener);
+ producer.Send(destination, message);
return this;
}
@@ -106,12 +104,12 @@
return Send(destination, CreateObjectMessage(body));
}
- public Task<INMSProducer> SendAsync(IDestination destination, IMessage message)
+ public async Task<INMSProducer> SendAsync(IDestination destination, IMessage message)
{
if (message == null) {
throw new MessageFormatException("Message must not be null");
}
-
+
NmsMessageTransformation.CopyMap(messageProperties, message.Properties);
if (correlationId != null) {
@@ -124,8 +122,8 @@
message.NMSReplyTo = replyTo;
}
- Task task = producer.SendAsync(destination, message, completionListener);
- return task.ContinueWith(t => (INMSProducer) this);
+ await producer.SendAsync(destination, message);
+ return this;
}
public Task<INMSProducer> SendAsync(IDestination destination, string body)
@@ -149,18 +147,6 @@
{
return SendAsync(destination, CreateObjectMessage(body));
}
-
- public CompletionListener CompletionListener
- {
- get => completionListener;
- set => completionListener = value;
- }
-
- public INMSProducer SetCompletionListener(CompletionListener completionListener)
- {
- CompletionListener = completionListener;
- return this;
- }
public INMSProducer ClearProperties()
{
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index e90db35..e1bb898 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -452,18 +452,16 @@
public void Send(NmsMessageProducer producer, IDestination destination, IMessage original,
MsgDeliveryMode deliveryMode,
- MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay, CompletionListener completionListener)
+ MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay)
{
- Task task = SendAsync(producer, destination, original, deliveryMode, priority, timeToLive, disableMessageId,
- disableMessageTimestamp, deliveryDelay, completionListener);
- if (completionListener == null)
- {
- task.ConfigureAwait(false).GetAwaiter().GetResult();
- }
+
+ SendAsync(producer, destination, original, deliveryMode, priority, timeToLive, disableMessageId,
+ disableMessageTimestamp, deliveryDelay).ConfigureAwait(false).GetAwaiter().GetResult();
+
}
public Task SendAsync(NmsMessageProducer producer, IDestination destination, IMessage original, MsgDeliveryMode deliveryMode,
- MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay, CompletionListener completionListener)
+ MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp, TimeSpan deliveryDelay)
{
if (destination == null)
throw new InvalidDestinationException("Destination must not be null");
@@ -525,34 +523,15 @@
outbound.OnSend(timeToLive);
- bool sync = deliveryMode == MsgDeliveryMode.Persistent;
+ bool fireAndForget = deliveryMode == MsgDeliveryMode.NonPersistent;
- Task task = TransactionContext.Send(new OutboundMessageDispatch
+ return TransactionContext.Send(new OutboundMessageDispatch
{
Message = outbound,
ProducerId = producer.Info.Id,
ProducerInfo = producer.Info,
- SendAsync = !sync
+ FireAndForget = fireAndForget
});
-
- if (completionListener == null)
- {
- return task;
- }
- else
- {
- return task.ContinueWith(t =>
- {
- if (t.IsFaulted)
- completionListener.Invoke(original, t.Exception.InnerException);
- else if (t.IsCanceled)
- completionListener.Invoke(original, new TaskCanceledException());
- else
- completionListener.Invoke(original, null);
-
- });
- }
-
}
internal void EnqueueForDispatch(NmsMessageConsumer.MessageDeliveryTask task)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index a90e1a7..1e86ced 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -21,6 +21,7 @@
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
+using Amqp.Types;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider.Amqp.Message;
@@ -63,6 +64,8 @@
public string TopicPrefix => Info.TopicPrefix;
public bool ObjectMessageUsesAmqpTypes { get; set; } = false;
public NmsConnectionInfo Info { get; }
+
+ public AmqpSubscriptionTracker SubscriptionTracker { get; } = new AmqpSubscriptionTracker();
public INmsMessageFactory MessageFactory => messageFactory;
@@ -128,6 +131,30 @@
}
else
{
+ Symbol[] capabilities = open.OfferedCapabilities;
+ if (capabilities != null)
+ {
+ if (Array.Exists(capabilities,
+ symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY)))
+ {
+ Info.AnonymousRelaySupported = true;
+ }
+
+ if (Array.Exists(capabilities,
+ symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY)))
+ {
+ Info.DelayedDeliverySupported = true;
+ }
+
+ if (Array.Exists(capabilities,
+ symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY)))
+ {
+ Info.DelayedDeliverySupported = true;
+ }
+ }
+
+
+
object value = SymbolUtil.GetFromFields(open.Properties, SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
if (value is string topicPrefix)
{
@@ -139,7 +166,6 @@
{
Info.QueuePrefix = queuePrefix;
}
-
this.tsc.TrySetResult(true);
Provider.FireConnectionEstablished();
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 7eb5d1c..7670d2f 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -42,6 +42,9 @@
private ReceiverLink receiverLink;
private readonly LinkedList<InboundMessageDispatch> messages;
private readonly object syncRoot = new object();
+
+ private bool validateSharedSubsLinkCapability;
+ private bool sharedSubsNotSupported;
private readonly AmqpSession session;
public IDestination Destination => info.Destination;
@@ -70,38 +73,95 @@
RcvSettleMode = ReceiverSettleMode.First,
SndSettleMode = (info.IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
};
- string name;
- if (info.IsDurable)
+
+ string receiverLinkName = null;
+
+ string subscriptionName = info.SubscriptionName;
+ if (!string.IsNullOrEmpty(subscriptionName))
{
- name = info.SubscriptionName;
+ AmqpConnection connection = session.Connection;
+
+ if (info.IsShared && !connection.Info.SharedSubsSupported) {
+ validateSharedSubsLinkCapability = true;
+ }
+
+ AmqpSubscriptionTracker subTracker = connection.SubscriptionTracker;
+
+ // Validate subscriber type allowed given existing active subscriber types.
+ if (info.IsShared && info.IsDurable) {
+ if(subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
+ // Don't allow shared sub if there is already an active exclusive durable sub
+ throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+ }
+ } else if (!info.IsShared && info.IsDurable) {
+ if (subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
+ // Exclusive durable sub is already active
+ throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+ } else if (subTracker.IsActiveSharedDurableSub(subscriptionName)) {
+ // Don't allow exclusive durable sub if there is already an active shared durable sub
+ throw new NMSException("A shared durable subscription is already active with name '" + subscriptionName + "'");
+ }
+ }
+
+ // Get the link name for the subscription. Throws if certain further validations fail.
+ receiverLinkName = subTracker.ReserveNextSubscriptionLinkName(subscriptionName, info);
}
- else
- {
+
+
+ if (receiverLinkName == null) {
string destinationAddress = source.Address ?? "";
- name = "nms:receiver:" + info.Id
- + (destinationAddress.Length == 0 ? "" : (":" + destinationAddress));
+ receiverLinkName = "nms:receiver:" + info.Id
+ + (destinationAddress.Length == 0 ? "" : (":" + destinationAddress));
}
// TODO: Add timeout
var tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
- receiverLink = new ReceiverLink(session.UnderlyingSession, name, attach, HandleOpened(tsc));
+ receiverLink = new ReceiverLink(session.UnderlyingSession, receiverLinkName, attach, HandleOpened(tsc));
receiverLink.AddClosedCallback(HandleClosed(tsc));
return tsc.Task;
}
private OnAttached HandleOpened(TaskCompletionSource<bool> tsc) => (link, attach) =>
{
+ if (validateSharedSubsLinkCapability)
+ {
+ Symbol[] remoteOfferedCapabilities = attach.OfferedCapabilities;
+
+ bool supported = false;
+ if (remoteOfferedCapabilities != null)
+ {
+ if (Array.Exists(remoteOfferedCapabilities, symbol => symbol == SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS))
+ {
+ supported = true;
+ }
+ }
+
+ if (!supported)
+ {
+ sharedSubsNotSupported = true;
+
+ if (info.IsDurable)
+ {
+ link.Detach(null);
+ }
+ else
+ {
+ link.Close();
+ }
+ }
+ }
+
if (IsClosePending(attach))
return;
tsc.SetResult(true);
};
- private static bool IsClosePending(Attach attach)
+ private bool IsClosePending(Attach attach)
{
// When no link terminus was created, the peer will now detach/close us otherwise
// we need to validate the returned remote source prior to open completion.
- return attach.Source == null;
+ return sharedSubsNotSupported || attach.Source == null;
}
private ClosedCallback HandleClosed(TaskCompletionSource<bool> tsc) => (sender, error) =>
@@ -109,6 +169,7 @@
NMSException exception = ExceptionSupport.GetException(error, "Received Amqp link detach with Error for link {0}", info.Id);
if (!tsc.TrySetException(exception))
{
+ session.Connection.SubscriptionTracker.ConsumerRemoved(info);
session.RemoveConsumer(info.Id);
// If session is not closed it means that the link was remotely detached
@@ -143,6 +204,8 @@
source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
source.Durable = (int) TerminusDurability.NONE;
}
+
+
if (info.IsBrowser)
{
@@ -363,7 +426,7 @@
public bool HasSubscription(string subscriptionName)
{
- return info.IsDurable && info.SubscriptionName.Equals(subscriptionName);
+ return (info.IsDurable || info.IsShared) && info.SubscriptionName.Equals(subscriptionName);
}
public void PostRollback()
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index a6e8206..4cf72fa 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -136,13 +136,12 @@
var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
- if (envelope.SendAsync)
- return SendAsync(message, transactionalState);
- else
+ if (envelope.FireAndForget)
{
SendSync(message, transactionalState);
return Task.CompletedTask;
}
+ return SendAsync(message, transactionalState);
}
catch (TimeoutException tex)
{
@@ -160,6 +159,7 @@
throw ExceptionSupport.Wrap(ex);
}
}
+ throw ExceptionSupport.GetException(this.senderLink, "unexpected enveloper");
}
private void SendSync(global::Amqp.Message message, DeliveryState deliveryState)
@@ -167,11 +167,24 @@
senderLink.Send(message, deliveryState, null, null);
}
- private Task SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
+ private async Task SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
- senderLink.Send(message, deliveryState, _onOutcome, tcs);
- return tcs.Task;
+ CancellationTokenSource cts = null;
+ if (session.Connection.Provider.SendTimeout != NmsConnectionInfo.INFINITE)
+ {
+ cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(session.Connection.Provider.SendTimeout));
+ cts.Token.Register(_ => tcs.TrySetCanceled(), null);
+ }
+ try
+ {
+ senderLink.Send(message, deliveryState, _onOutcome, tcs);
+ await tcs.Task.ConfigureAwait(false);
+ }
+ finally
+ {
+ cts?.Dispose();
+ }
}
private static void OnOutcome(ILink sender, global::Amqp.Message message, Outcome outcome, object state)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs
new file mode 100644
index 0000000..5679594
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSubscriptionTracker.cs
@@ -0,0 +1,327 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.NMS.AMQP.Meta;
+
+namespace Apache.NMS.AMQP.Provider.Amqp
+{
+ public class AmqpSubscriptionTracker
+ {
+
+ // Subscription Name Delimiter
+ public readonly static string SUB_NAME_DELIMITER = "|";
+
+ private readonly ISet<string> exclusiveDurableSubs = new HashSet<string>();
+
+ private readonly IDictionary<string, SubDetails> sharedDurableSubs =
+ new ConcurrentDictionary<string, SubDetails>();
+
+ private readonly IDictionary<string, SubDetails> sharedVolatileSubs =
+ new ConcurrentDictionary<string, SubDetails>();
+
+ public string ReserveNextSubscriptionLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+ {
+ ValidateSubscriptionName(subscriptionName);
+
+ if (consumerInfo == null)
+ {
+ throw new ArgumentException("Consumer info must not be null.");
+ }
+
+ if (consumerInfo.IsShared)
+ {
+ if (consumerInfo.IsDurable)
+ {
+ return GetSharedDurableSubLinkName(subscriptionName, consumerInfo);
+ }
+ else
+ {
+ return GetSharedVolatileSubLinkName(subscriptionName, consumerInfo);
+ }
+ }
+ else if (consumerInfo.IsDurable)
+ {
+ RegisterExclusiveDurableSub(subscriptionName);
+ return subscriptionName;
+ }
+ else
+ {
+ throw new IllegalStateException(
+ "Non-shared non-durable sub link naming is not handled by the tracker.");
+ }
+ }
+
+ private void ValidateSubscriptionName(string subscriptionName)
+ {
+ if (string.IsNullOrEmpty(subscriptionName))
+ {
+ throw new ArgumentException("Subscription name must not be null or empty.");
+ }
+
+ if (subscriptionName.Contains(SUB_NAME_DELIMITER))
+ {
+ throw new ArgumentException(
+ "Subscription name must not contain '" + SUB_NAME_DELIMITER + "' character.");
+ }
+ }
+
+ private string GetSharedDurableSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+ {
+ IDestination topic = consumerInfo.Destination;
+ string selector = consumerInfo.Selector;
+
+ SubDetails subDetails = null;
+ if (sharedDurableSubs.ContainsKey(subscriptionName))
+ {
+ subDetails = sharedDurableSubs[subscriptionName];
+
+ if (subDetails.Matches(topic, selector))
+ {
+ subDetails.AddSubscriber(consumerInfo);
+ }
+ else
+ {
+ throw new NMSException("Subscription details dont match existing subscriber.");
+ }
+ }
+ else
+ {
+ subDetails = new SubDetails(topic, selector, consumerInfo);
+ }
+
+ sharedDurableSubs.Add(subscriptionName, subDetails);
+
+ int count = subDetails.TotalSubscriberCount();
+
+ return GetDurableSubscriptionLinkName(subscriptionName, consumerInfo.IsExplicitClientId, count);
+ }
+
+ private string GetDurableSubscriptionLinkName(string subscriptionName, bool hasClientID, int count)
+ {
+ string linkName = GetFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
+ if (count > 1)
+ {
+ if (hasClientID)
+ {
+ linkName += SUB_NAME_DELIMITER + count;
+ }
+ else
+ {
+ linkName += count;
+ }
+ }
+
+ return linkName;
+ }
+
+ public string GetFirstDurableSubscriptionLinkName(string subscriptionName, bool hasClientID)
+ {
+ ValidateSubscriptionName(subscriptionName);
+
+ String receiverLinkName = subscriptionName;
+ if (!hasClientID)
+ {
+ receiverLinkName += SUB_NAME_DELIMITER + "global";
+ }
+
+ return receiverLinkName;
+ }
+
+ private String GetSharedVolatileSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
+ {
+ IDestination topic = consumerInfo.Destination;
+ string selector = consumerInfo.Selector;
+
+ SubDetails subDetails = null;
+ if (sharedVolatileSubs.ContainsKey(subscriptionName))
+ {
+ subDetails = sharedVolatileSubs[subscriptionName];
+
+ if (subDetails.Matches(topic, selector))
+ {
+ subDetails.AddSubscriber(consumerInfo);
+ }
+ else
+ {
+ throw new NMSException("Subscription details dont match existing subscriber");
+ }
+ }
+ else
+ {
+ subDetails = new SubDetails(topic, selector, consumerInfo);
+ }
+
+ sharedVolatileSubs.Add(subscriptionName, subDetails);
+
+ string receiverLinkName = subscriptionName + SUB_NAME_DELIMITER;
+ int count = subDetails.TotalSubscriberCount();
+
+ if (consumerInfo.IsExplicitClientId)
+ {
+ receiverLinkName += "volatile" + count;
+ }
+ else
+ {
+ receiverLinkName += "global-volatile" + count;
+ }
+
+ return receiverLinkName;
+ }
+
+ private void RegisterExclusiveDurableSub(String subscriptionName)
+ {
+ exclusiveDurableSubs.Add(subscriptionName);
+ }
+
+ /**
+ * Checks if there is an exclusive durable subscription already
+ * recorded as active with the given subscription name.
+ *
+ * @param subscriptionName name of subscription to check
+ * @return true if there is an exclusive durable sub with this name already active
+ */
+ public bool IsActiveExclusiveDurableSub(String subscriptionName)
+ {
+ return exclusiveDurableSubs.Contains(subscriptionName);
+ }
+
+ /**
+ * Checks if there is a shared durable subscription already
+ * recorded as active with the given subscription name.
+ *
+ * @param subscriptionName name of subscription to check
+ * @return true if there is a shared durable sub with this name already active
+ */
+ public bool IsActiveSharedDurableSub(string subscriptionName)
+ {
+ return sharedDurableSubs.ContainsKey(subscriptionName);
+ }
+
+ /**
+ * Checks if there is either a shared or exclusive durable subscription
+ * already recorded as active with the given subscription name.
+ *
+ * @param subscriptionName name of subscription to check
+ * @return true if there is a durable sub with this name already active
+ */
+ public bool IsActiveDurableSub(string subscriptionName)
+ {
+ return IsActiveExclusiveDurableSub(subscriptionName) || IsActiveSharedDurableSub(subscriptionName);
+ }
+
+ /**
+ * Checks if there is an shared volatile subscription already
+ * recorded as active with the given subscription name.
+ *
+ * @param subscriptionName name of subscription to check
+ * @return true if there is a shared volatile sub with this name already active
+ */
+ public bool IsActiveSharedVolatileSub(String subscriptionName)
+ {
+ return sharedVolatileSubs.ContainsKey(subscriptionName);
+ }
+
+ public void ConsumerRemoved(NmsConsumerInfo consumerInfo)
+ {
+ string subscriptionName = consumerInfo.SubscriptionName;
+
+ if (!string.IsNullOrEmpty(subscriptionName))
+ {
+ if (consumerInfo.IsShared)
+ {
+ if (consumerInfo.IsDurable)
+ {
+ if (sharedDurableSubs.ContainsKey(subscriptionName))
+ {
+ SubDetails subDetails = sharedDurableSubs[subscriptionName];
+ subDetails.RemoveSubscriber(consumerInfo);
+
+ int count = subDetails.ActiveSubscribers();
+ if (count < 1)
+ {
+ sharedDurableSubs.Remove(subscriptionName);
+ }
+ }
+ }
+ else
+ {
+ if (sharedVolatileSubs.ContainsKey(subscriptionName))
+ {
+ SubDetails subDetails = sharedVolatileSubs[subscriptionName];
+ subDetails.RemoveSubscriber(consumerInfo);
+
+ int count = subDetails.ActiveSubscribers();
+ if (count < 1)
+ {
+ sharedVolatileSubs.Remove(subscriptionName);
+ }
+ }
+ }
+ }
+ else if (consumerInfo.IsDurable)
+ {
+ exclusiveDurableSubs.Remove(subscriptionName);
+ }
+ }
+ }
+
+ private class SubDetails
+ {
+ private IDestination topic = null;
+ private String selector = null;
+ private ISet<NmsConsumerInfo> subscribers = new HashSet<NmsConsumerInfo>();
+ private int totalSubscriberCount;
+
+ public SubDetails(IDestination topic, string selector, NmsConsumerInfo info)
+ {
+ this.topic = topic ?? throw new ArgumentException("Topic destination must not be null");
+ this.selector = selector;
+ AddSubscriber(info);
+ }
+
+ public void AddSubscriber(NmsConsumerInfo info)
+ {
+ if (info == null)
+ {
+ throw new ArgumentException("Consumer info must not be null");
+ }
+
+ totalSubscriberCount++;
+ subscribers.Add(info);
+ }
+
+ public void RemoveSubscriber(NmsConsumerInfo info)
+ {
+ subscribers.Remove(info);
+ }
+
+ public int ActiveSubscribers()
+ {
+ return subscribers.Count;
+ }
+
+ public int TotalSubscriberCount()
+ {
+ return totalSubscriberCount;
+ }
+
+ public bool Matches(IDestination newTopic, string newSelector)
+ {
+ if (!topic.Equals(newTopic))
+ {
+ return false;
+ }
+
+ if (selector == null)
+ {
+ return newSelector == null;
+ }
+ else
+ {
+ return selector.Equals(newSelector);
+ }
+ }
+
+ }
+ }
+}
\ No newline at end of file