blob: 56795942fe171f464d42164f15a134c38d1575b2 [file] [log] [blame]
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Apache.NMS.AMQP.Meta;
namespace Apache.NMS.AMQP.Provider.Amqp
public class AmqpSubscriptionTracker
// Subscription Name Delimiter
public readonly static string SUB_NAME_DELIMITER = "|";
private readonly ISet<string> exclusiveDurableSubs = new HashSet<string>();
private readonly IDictionary<string, SubDetails> sharedDurableSubs =
new ConcurrentDictionary<string, SubDetails>();
private readonly IDictionary<string, SubDetails> sharedVolatileSubs =
new ConcurrentDictionary<string, SubDetails>();
public string ReserveNextSubscriptionLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
if (consumerInfo == null)
throw new ArgumentException("Consumer info must not be null.");
if (consumerInfo.IsShared)
if (consumerInfo.IsDurable)
return GetSharedDurableSubLinkName(subscriptionName, consumerInfo);
return GetSharedVolatileSubLinkName(subscriptionName, consumerInfo);
else if (consumerInfo.IsDurable)
return subscriptionName;
throw new IllegalStateException(
"Non-shared non-durable sub link naming is not handled by the tracker.");
private void ValidateSubscriptionName(string subscriptionName)
if (string.IsNullOrEmpty(subscriptionName))
throw new ArgumentException("Subscription name must not be null or empty.");
if (subscriptionName.Contains(SUB_NAME_DELIMITER))
throw new ArgumentException(
"Subscription name must not contain '" + SUB_NAME_DELIMITER + "' character.");
private string GetSharedDurableSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
IDestination topic = consumerInfo.Destination;
string selector = consumerInfo.Selector;
SubDetails subDetails = null;
if (sharedDurableSubs.ContainsKey(subscriptionName))
subDetails = sharedDurableSubs[subscriptionName];
if (subDetails.Matches(topic, selector))
throw new NMSException("Subscription details dont match existing subscriber.");
subDetails = new SubDetails(topic, selector, consumerInfo);
sharedDurableSubs.Add(subscriptionName, subDetails);
int count = subDetails.TotalSubscriberCount();
return GetDurableSubscriptionLinkName(subscriptionName, consumerInfo.IsExplicitClientId, count);
private string GetDurableSubscriptionLinkName(string subscriptionName, bool hasClientID, int count)
string linkName = GetFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
if (count > 1)
if (hasClientID)
linkName += SUB_NAME_DELIMITER + count;
linkName += count;
return linkName;
public string GetFirstDurableSubscriptionLinkName(string subscriptionName, bool hasClientID)
String receiverLinkName = subscriptionName;
if (!hasClientID)
receiverLinkName += SUB_NAME_DELIMITER + "global";
return receiverLinkName;
private String GetSharedVolatileSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
IDestination topic = consumerInfo.Destination;
string selector = consumerInfo.Selector;
SubDetails subDetails = null;
if (sharedVolatileSubs.ContainsKey(subscriptionName))
subDetails = sharedVolatileSubs[subscriptionName];
if (subDetails.Matches(topic, selector))
throw new NMSException("Subscription details dont match existing subscriber");
subDetails = new SubDetails(topic, selector, consumerInfo);
sharedVolatileSubs.Add(subscriptionName, subDetails);
string receiverLinkName = subscriptionName + SUB_NAME_DELIMITER;
int count = subDetails.TotalSubscriberCount();
if (consumerInfo.IsExplicitClientId)
receiverLinkName += "volatile" + count;
receiverLinkName += "global-volatile" + count;
return receiverLinkName;
private void RegisterExclusiveDurableSub(String subscriptionName)
* Checks if there is an exclusive durable subscription already
* recorded as active with the given subscription name.
* @param subscriptionName name of subscription to check
* @return true if there is an exclusive durable sub with this name already active
public bool IsActiveExclusiveDurableSub(String subscriptionName)
return exclusiveDurableSubs.Contains(subscriptionName);
* Checks if there is a shared durable subscription already
* recorded as active with the given subscription name.
* @param subscriptionName name of subscription to check
* @return true if there is a shared durable sub with this name already active
public bool IsActiveSharedDurableSub(string subscriptionName)
return sharedDurableSubs.ContainsKey(subscriptionName);
* Checks if there is either a shared or exclusive durable subscription
* already recorded as active with the given subscription name.
* @param subscriptionName name of subscription to check
* @return true if there is a durable sub with this name already active
public bool IsActiveDurableSub(string subscriptionName)
return IsActiveExclusiveDurableSub(subscriptionName) || IsActiveSharedDurableSub(subscriptionName);
* Checks if there is an shared volatile subscription already
* recorded as active with the given subscription name.
* @param subscriptionName name of subscription to check
* @return true if there is a shared volatile sub with this name already active
public bool IsActiveSharedVolatileSub(String subscriptionName)
return sharedVolatileSubs.ContainsKey(subscriptionName);
public void ConsumerRemoved(NmsConsumerInfo consumerInfo)
string subscriptionName = consumerInfo.SubscriptionName;
if (!string.IsNullOrEmpty(subscriptionName))
if (consumerInfo.IsShared)
if (consumerInfo.IsDurable)
if (sharedDurableSubs.ContainsKey(subscriptionName))
SubDetails subDetails = sharedDurableSubs[subscriptionName];
int count = subDetails.ActiveSubscribers();
if (count < 1)
if (sharedVolatileSubs.ContainsKey(subscriptionName))
SubDetails subDetails = sharedVolatileSubs[subscriptionName];
int count = subDetails.ActiveSubscribers();
if (count < 1)
else if (consumerInfo.IsDurable)
private class SubDetails
private IDestination topic = null;
private String selector = null;
private ISet<NmsConsumerInfo> subscribers = new HashSet<NmsConsumerInfo>();
private int totalSubscriberCount;
public SubDetails(IDestination topic, string selector, NmsConsumerInfo info)
this.topic = topic ?? throw new ArgumentException("Topic destination must not be null");
this.selector = selector;
public void AddSubscriber(NmsConsumerInfo info)
if (info == null)
throw new ArgumentException("Consumer info must not be null");
public void RemoveSubscriber(NmsConsumerInfo info)
public int ActiveSubscribers()
return subscribers.Count;
public int TotalSubscriberCount()
return totalSubscriberCount;
public bool Matches(IDestination newTopic, string newSelector)
if (!topic.Equals(newTopic))
return false;
if (selector == null)
return newSelector == null;
return selector.Equals(newSelector);