blob: c25be5639e0ce7bf4a1fbd32d12ee2506b7a6761 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class TopicRegion extends AbstractRegion {
private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
private boolean keepDurableSubsActive;
public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.isDurable()) {
ActiveMQDestination destination = info.getDestination();
if (!destination.isPattern()) {
// Make sure the destination is created.
lookup(context, destination,true);
}
String clientId = context.getClientId();
String subscriptionName = info.getSubscriptionName();
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
DurableTopicSubscription sub = durableSubscriptions.get(key);
if (sub != null) {
if (sub.isActive()) {
throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
}
// Has the selector changed??
if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
// Remove the consumer first then add it.
durableSubscriptions.remove(key);
synchronized (destinationsMutex) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Destination dest = iter.next();
//Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
}
}
}
super.removeConsumer(context, sub.getConsumerInfo());
super.addConsumer(context, info);
sub = durableSubscriptions.get(key);
} else {
// Change the consumer id key of the durable sub.
if (sub.getConsumerInfo().getConsumerId() != null) {
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
}
subscriptions.put(info.getConsumerId(), sub);
}
} else {
super.addConsumer(context, info);
sub = durableSubscriptions.get(key);
if (sub == null) {
throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
+ " subscriberName: " + key.getSubscriptionName());
}
}
sub.activate(usageManager, context, info);
return sub;
} else {
return super.addConsumer(context, info);
}
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.isDurable()) {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key);
if (sub != null) {
sub.deactivate(keepDurableSubsActive);
}
} else {
super.removeConsumer(context, info);
}
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.remove(key);
if (sub == null) {
throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
}
if (sub.isActive()) {
throw new JMSException("Durable consumer is in use");
}
synchronized (destinationsMutex) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Destination dest = iter.next();
//Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
}
}
}
if (subscriptions.get(sub.getConsumerInfo()) != null) {
super.removeConsumer(context, sub.getConsumerInfo());
} else {
// try destroying inactive subscriptions
destroySubscription(sub);
}
}
@Override
public String toString() {
return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
@Override
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
// Eagerly recover the durable subscriptions
if (store != null) {
SubscriptionInfo[] infos = store.getAllSubscriptions();
for (int i = 0; i < infos.length; i++) {
SubscriptionInfo info = infos[i];
LOG.debug("Restoring durable subscription: " + info);
SubscriptionKey key = new SubscriptionKey(info);
// A single durable sub may be subscribing to multiple topics.
// so it might exist already.
DurableTopicSubscription sub = durableSubscriptions.get(key);
ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
if (sub == null) {
ConnectionContext c = new ConnectionContext();
c.setBroker(context.getBroker());
c.setClientId(key.getClientId());
c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
}
if (dupChecker.contains(sub)) {
continue;
}
dupChecker.add(sub);
rc.add(sub);
dest.addSubscription(context, sub);
}
// Now perhaps there other durable subscriptions (via wild card)
// that would match this destination..
durableSubscriptions.values();
for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
DurableTopicSubscription sub = iterator.next();
// Skip over subscriptions that we allready added..
if (dupChecker.contains(sub)) {
continue;
}
if (sub.matches(dest.getActiveMQDestination())) {
rc.add(sub);
dest.addSubscription(context, sub);
}
}
}
return rc;
}
private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubscriptionName(info.getSubscriptionName());
rc.setDestination(info.getSubscribedDestination());
rc.setConsumerId(createConsumerId());
return rc;
}
private ConsumerId createConsumerId() {
return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
}
protected void configureTopic(Topic topic, ActiveMQDestination destination) {
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(broker,topic);
}
}
}
@Override
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
ActiveMQDestination destination = info.getDestination();
if (info.isDurable()) {
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
throw new JMSException("Cannot create a durable subscription for an advisory Topic");
}
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key);
if (sub == null) {
sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(broker, usageManager, sub);
}
}
durableSubscriptions.put(key, sub);
} else {
throw new JMSException("That durable subscription is already active.");
}
return sub;
}
try {
TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(broker, usageManager, answer);
}
}
answer.init();
return answer;
} catch (Exception e) {
LOG.error("Failed to create TopicSubscription ", e);
JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
jmsEx.setLinkedException(e);
throw jmsEx;
}
}
/**
*/
private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
if (info1.getSelector() != null ^ info2.getSelector() != null) {
return true;
}
if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
return true;
}
return !info1.getDestination().equals(info2.getDestination());
}
@Override
protected Set<ActiveMQDestination> getInactiveDestinations() {
Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
ActiveMQDestination dest = iter.next();
if (!dest.isTopic()) {
iter.remove();
}
}
return inactiveDestinations;
}
public boolean isKeepDurableSubsActive() {
return keepDurableSubsActive;
}
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
public boolean durableSubscriptionExists(SubscriptionKey key) {
return this.durableSubscriptions.containsKey(key);
}
}