blob: 56795942fe171f464d42164f15a134c38d1575b2 [file] [log] [blame]
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Apache.NMS.AMQP.Meta;
namespace Apache.NMS.AMQP.Provider.Amqp
{
public class AmqpSubscriptionTracker
{
// Subscription Name Delimiter
public readonly static string SUB_NAME_DELIMITER = "|";
private readonly ISet<string> exclusiveDurableSubs = new HashSet<string>();
private readonly IDictionary<string, SubDetails> sharedDurableSubs =
new ConcurrentDictionary<string, SubDetails>();
private readonly IDictionary<string, SubDetails> sharedVolatileSubs =
new ConcurrentDictionary<string, SubDetails>();
public string ReserveNextSubscriptionLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
{
ValidateSubscriptionName(subscriptionName);
if (consumerInfo == null)
{
throw new ArgumentException("Consumer info must not be null.");
}
if (consumerInfo.IsShared)
{
if (consumerInfo.IsDurable)
{
return GetSharedDurableSubLinkName(subscriptionName, consumerInfo);
}
else
{
return GetSharedVolatileSubLinkName(subscriptionName, consumerInfo);
}
}
else if (consumerInfo.IsDurable)
{
RegisterExclusiveDurableSub(subscriptionName);
return subscriptionName;
}
else
{
throw new IllegalStateException(
"Non-shared non-durable sub link naming is not handled by the tracker.");
}
}
private void ValidateSubscriptionName(string subscriptionName)
{
if (string.IsNullOrEmpty(subscriptionName))
{
throw new ArgumentException("Subscription name must not be null or empty.");
}
if (subscriptionName.Contains(SUB_NAME_DELIMITER))
{
throw new ArgumentException(
"Subscription name must not contain '" + SUB_NAME_DELIMITER + "' character.");
}
}
private string GetSharedDurableSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
{
IDestination topic = consumerInfo.Destination;
string selector = consumerInfo.Selector;
SubDetails subDetails = null;
if (sharedDurableSubs.ContainsKey(subscriptionName))
{
subDetails = sharedDurableSubs[subscriptionName];
if (subDetails.Matches(topic, selector))
{
subDetails.AddSubscriber(consumerInfo);
}
else
{
throw new NMSException("Subscription details dont match existing subscriber.");
}
}
else
{
subDetails = new SubDetails(topic, selector, consumerInfo);
}
sharedDurableSubs.Add(subscriptionName, subDetails);
int count = subDetails.TotalSubscriberCount();
return GetDurableSubscriptionLinkName(subscriptionName, consumerInfo.IsExplicitClientId, count);
}
private string GetDurableSubscriptionLinkName(string subscriptionName, bool hasClientID, int count)
{
string linkName = GetFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
if (count > 1)
{
if (hasClientID)
{
linkName += SUB_NAME_DELIMITER + count;
}
else
{
linkName += count;
}
}
return linkName;
}
public string GetFirstDurableSubscriptionLinkName(string subscriptionName, bool hasClientID)
{
ValidateSubscriptionName(subscriptionName);
String receiverLinkName = subscriptionName;
if (!hasClientID)
{
receiverLinkName += SUB_NAME_DELIMITER + "global";
}
return receiverLinkName;
}
private String GetSharedVolatileSubLinkName(string subscriptionName, NmsConsumerInfo consumerInfo)
{
IDestination topic = consumerInfo.Destination;
string selector = consumerInfo.Selector;
SubDetails subDetails = null;
if (sharedVolatileSubs.ContainsKey(subscriptionName))
{
subDetails = sharedVolatileSubs[subscriptionName];
if (subDetails.Matches(topic, selector))
{
subDetails.AddSubscriber(consumerInfo);
}
else
{
throw new NMSException("Subscription details dont match existing subscriber");
}
}
else
{
subDetails = new SubDetails(topic, selector, consumerInfo);
}
sharedVolatileSubs.Add(subscriptionName, subDetails);
string receiverLinkName = subscriptionName + SUB_NAME_DELIMITER;
int count = subDetails.TotalSubscriberCount();
if (consumerInfo.IsExplicitClientId)
{
receiverLinkName += "volatile" + count;
}
else
{
receiverLinkName += "global-volatile" + count;
}
return receiverLinkName;
}
private void RegisterExclusiveDurableSub(String subscriptionName)
{
exclusiveDurableSubs.Add(subscriptionName);
}
/**
* Checks if there is an exclusive durable subscription already
* recorded as active with the given subscription name.
*
* @param subscriptionName name of subscription to check
* @return true if there is an exclusive durable sub with this name already active
*/
public bool IsActiveExclusiveDurableSub(String subscriptionName)
{
return exclusiveDurableSubs.Contains(subscriptionName);
}
/**
* Checks if there is a shared durable subscription already
* recorded as active with the given subscription name.
*
* @param subscriptionName name of subscription to check
* @return true if there is a shared durable sub with this name already active
*/
public bool IsActiveSharedDurableSub(string subscriptionName)
{
return sharedDurableSubs.ContainsKey(subscriptionName);
}
/**
* Checks if there is either a shared or exclusive durable subscription
* already recorded as active with the given subscription name.
*
* @param subscriptionName name of subscription to check
* @return true if there is a durable sub with this name already active
*/
public bool IsActiveDurableSub(string subscriptionName)
{
return IsActiveExclusiveDurableSub(subscriptionName) || IsActiveSharedDurableSub(subscriptionName);
}
/**
* Checks if there is an shared volatile subscription already
* recorded as active with the given subscription name.
*
* @param subscriptionName name of subscription to check
* @return true if there is a shared volatile sub with this name already active
*/
public bool IsActiveSharedVolatileSub(String subscriptionName)
{
return sharedVolatileSubs.ContainsKey(subscriptionName);
}
public void ConsumerRemoved(NmsConsumerInfo consumerInfo)
{
string subscriptionName = consumerInfo.SubscriptionName;
if (!string.IsNullOrEmpty(subscriptionName))
{
if (consumerInfo.IsShared)
{
if (consumerInfo.IsDurable)
{
if (sharedDurableSubs.ContainsKey(subscriptionName))
{
SubDetails subDetails = sharedDurableSubs[subscriptionName];
subDetails.RemoveSubscriber(consumerInfo);
int count = subDetails.ActiveSubscribers();
if (count < 1)
{
sharedDurableSubs.Remove(subscriptionName);
}
}
}
else
{
if (sharedVolatileSubs.ContainsKey(subscriptionName))
{
SubDetails subDetails = sharedVolatileSubs[subscriptionName];
subDetails.RemoveSubscriber(consumerInfo);
int count = subDetails.ActiveSubscribers();
if (count < 1)
{
sharedVolatileSubs.Remove(subscriptionName);
}
}
}
}
else if (consumerInfo.IsDurable)
{
exclusiveDurableSubs.Remove(subscriptionName);
}
}
}
private class SubDetails
{
private IDestination topic = null;
private String selector = null;
private ISet<NmsConsumerInfo> subscribers = new HashSet<NmsConsumerInfo>();
private int totalSubscriberCount;
public SubDetails(IDestination topic, string selector, NmsConsumerInfo info)
{
this.topic = topic ?? throw new ArgumentException("Topic destination must not be null");
this.selector = selector;
AddSubscriber(info);
}
public void AddSubscriber(NmsConsumerInfo info)
{
if (info == null)
{
throw new ArgumentException("Consumer info must not be null");
}
totalSubscriberCount++;
subscribers.Add(info);
}
public void RemoveSubscriber(NmsConsumerInfo info)
{
subscribers.Remove(info);
}
public int ActiveSubscribers()
{
return subscribers.Count;
}
public int TotalSubscriberCount()
{
return totalSubscriberCount;
}
public bool Matches(IDestination newTopic, string newSelector)
{
if (!topic.Equals(newTopic))
{
return false;
}
if (selector == null)
{
return newSelector == null;
}
else
{
return selector.Equals(newSelector);
}
}
}
}
}