blob: 0218ed82b4346a96a9d7568315c6917582166a50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.provider.amqp;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import jakarta.jms.JMSRuntimeException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
/**
* Class used to track named subscriptions on a connection to allow verifying
* current usage and assigning appropriate link names.
*/
public class AmqpSubscriptionTracker {
Set<String> exclusiveDurableSubs = new HashSet<>();
Map<String, SubDetails> sharedDurableSubs = new HashMap<>();
Map<String, SubDetails> sharedVolatileSubs = new HashMap<>();
public String reserveNextSubscriptionLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
validateSubscriptionName(subscriptionName);
if(consumerInfo == null) {
throw new IllegalArgumentException("Consumer info must not be null.");
}
if (consumerInfo.isShared()) {
if (consumerInfo.isDurable()) {
return getSharedDurableSubLinkName(subscriptionName, consumerInfo);
} else {
return getSharedVolatileSubLinkName(subscriptionName, consumerInfo);
}
} else if (consumerInfo.isDurable()) {
registerExclusiveDurableSub(subscriptionName);
return subscriptionName;
} else {
throw new IllegalStateException("Non-shared non-durable sub link naming is not handled by the tracker.");
}
}
private void validateSubscriptionName(String subscriptionName) {
if(subscriptionName == null) {
throw new IllegalArgumentException("Subscription name must not be null.");
}
if(subscriptionName.isEmpty()) {
throw new IllegalArgumentException("Subscription name must not be empty.");
}
if(subscriptionName.contains(SUB_NAME_DELIMITER)) {
throw new IllegalArgumentException("Subscription name must not contain '" + SUB_NAME_DELIMITER +"' character.");
}
}
private String getSharedDurableSubLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
JmsDestination topic = consumerInfo.getDestination();
String selector = consumerInfo.getSelector();
SubDetails subDetails = null;
if(sharedDurableSubs.containsKey(subscriptionName)) {
subDetails = sharedDurableSubs.get(subscriptionName);
if(subDetails.matches(topic, selector)){
subDetails.addSubscriber(consumerInfo);
} else {
throw new JMSRuntimeException("Subscription details dont match existing subscriber.");
}
} else {
subDetails = new SubDetails(topic, selector, consumerInfo);
}
sharedDurableSubs.put(subscriptionName, subDetails);
int count = subDetails.totalSubscriberCount();
return getDurableSubscriptionLinkName(subscriptionName, consumerInfo.isExplicitClientID(), count);
}
private String getDurableSubscriptionLinkName(String subscriptionName, boolean hasClientID, int count) {
String linkName = getFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
if(count > 1) {
if(hasClientID) {
linkName += SUB_NAME_DELIMITER + count;
} else {
linkName += count;
}
}
return linkName;
}
public String getFirstDurableSubscriptionLinkName(String subscriptionName, boolean hasClientID) {
validateSubscriptionName(subscriptionName);
String receiverLinkName = subscriptionName;
if(!hasClientID) {
receiverLinkName += SUB_NAME_DELIMITER + "global";
}
return receiverLinkName;
}
private String getSharedVolatileSubLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
JmsDestination topic = consumerInfo.getDestination();
String selector = consumerInfo.getSelector();
SubDetails subDetails = null;
if(sharedVolatileSubs.containsKey(subscriptionName)) {
subDetails = sharedVolatileSubs.get(subscriptionName);
if(subDetails.matches(topic, selector)){
subDetails.addSubscriber(consumerInfo);
} else {
throw new JMSRuntimeException("Subscription details dont match existing subscriber");
}
} else {
subDetails = new SubDetails(topic, selector, consumerInfo);
}
sharedVolatileSubs.put(subscriptionName, subDetails);
String receiverLinkName = subscriptionName + SUB_NAME_DELIMITER;
int count = subDetails.totalSubscriberCount();
if (consumerInfo.isExplicitClientID()) {
receiverLinkName += "volatile" + count;
} else {
receiverLinkName += "global-volatile" + count;
}
return receiverLinkName;
}
private void registerExclusiveDurableSub(String subscriptionName) {
exclusiveDurableSubs.add(subscriptionName);
}
/**
* Checks if there is an exclusive durable subscription already
* recorded as active with the given subscription name.
*
* @param subscriptionName name of subscription to check
* @return true if there is an exclusive durable sub with this name already active
*/
public boolean isActiveExclusiveDurableSub(String subscriptionName) {
return exclusiveDurableSubs.contains(subscriptionName);
}
/**
* Checks if there is a shared durable subscription already
* recorded as active with the given subscription name.
*
* @param subscriptionName name of subscription to check
* @return true if there is a shared durable sub with this name already active
*/
public boolean isActiveSharedDurableSub(String subscriptionName) {
return sharedDurableSubs.containsKey(subscriptionName);
}
/**
* Checks if there is either a shared or exclusive durable subscription
* already recorded as active with the given subscription name.
*
* @param subscriptionName name of subscription to check
* @return true if there is a durable sub with this name already active
*/
public boolean isActiveDurableSub(String subscriptionName) {
return isActiveExclusiveDurableSub(subscriptionName) || isActiveSharedDurableSub(subscriptionName);
}
/**
* Checks if there is an shared volatile subscription already
* recorded as active with the given subscription name.
*
* @param subscriptionName name of subscription to check
* @return true if there is a shared volatile sub with this name already active
*/
public boolean isActiveSharedVolatileSub(String subscriptionName) {
return sharedVolatileSubs.containsKey(subscriptionName);
}
public void consumerRemoved(JmsConsumerInfo consumerInfo) {
String subscriptionName = consumerInfo.getSubscriptionName();
if (subscriptionName != null && !subscriptionName.isEmpty()) {
if (consumerInfo.isShared()) {
if (consumerInfo.isDurable()) {
if(sharedDurableSubs.containsKey(subscriptionName)) {
SubDetails subDetails = sharedDurableSubs.get(subscriptionName);
subDetails.removeSubscriber(consumerInfo);
int count = subDetails.activeSubscribers();
if(count < 1) {
sharedDurableSubs.remove(subscriptionName);
}
}
} else {
if(sharedVolatileSubs.containsKey(subscriptionName)) {
SubDetails subDetails = sharedVolatileSubs.get(subscriptionName);
subDetails.removeSubscriber(consumerInfo);
int count = subDetails.activeSubscribers();
if(count < 1) {
sharedVolatileSubs.remove(subscriptionName);
}
}
}
} else if (consumerInfo.isDurable()) {
exclusiveDurableSubs.remove(subscriptionName);
}
}
}
private static class SubDetails {
private JmsDestination topic = null;
private String selector = null;
private Set<JmsConsumerInfo> subscribers = new HashSet<>();
private int totalSubscriberCount;
public SubDetails(JmsDestination topic, String selector, JmsConsumerInfo info) {
if(topic == null) {
throw new IllegalArgumentException("Topic destination must not be null");
}
this.topic = topic;
this.selector = selector;
addSubscriber(info);
}
public void addSubscriber(JmsConsumerInfo info) {
if(info == null) {
throw new IllegalArgumentException("Consumer info must not be null");
}
totalSubscriberCount++;
subscribers.add(info);
}
public void removeSubscriber(JmsConsumerInfo info) {
subscribers.remove(info);
}
public int activeSubscribers() {
return subscribers.size();
}
public int totalSubscriberCount() {
return totalSubscriberCount;
}
public boolean matches(JmsDestination newTopic, String newSelector) {
if(!topic.equals(newTopic)) {
return false;
}
if (selector == null) {
return newSelector == null;
} else {
return selector.equals(newSelector);
}
}
}
}