| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.broker.region; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import javax.jms.InvalidSelectorException; |
| import javax.jms.JMSException; |
| |
| import org.apache.activemq.broker.Broker; |
| import org.apache.activemq.broker.ConnectionContext; |
| import org.apache.activemq.broker.region.cursors.PendingMessageCursor; |
| import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ConsumerInfo; |
| import org.apache.activemq.command.Message; |
| import org.apache.activemq.command.MessageAck; |
| import org.apache.activemq.command.MessageDispatch; |
| import org.apache.activemq.command.MessageId; |
| import org.apache.activemq.filter.MessageEvaluationContext; |
| import org.apache.activemq.store.TopicMessageStore; |
| import org.apache.activemq.usage.SystemUsage; |
| import org.apache.activemq.usage.Usage; |
| import org.apache.activemq.usage.UsageListener; |
| import org.apache.activemq.util.SubscriptionKey; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class); |
| private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>(); |
| private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); |
| private final SubscriptionKey subscriptionKey; |
| private final boolean keepDurableSubsActive; |
| private AtomicBoolean active = new AtomicBoolean(); |
| |
| public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) |
| throws JMSException { |
| super(broker,usageManager, context, info); |
| this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); |
| this.pending.setSystemUsage(usageManager); |
| this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); |
| this.keepDurableSubsActive = keepDurableSubsActive; |
| subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); |
| |
| } |
| |
| public final boolean isActive() { |
| return active.get(); |
| } |
| |
| public boolean isFull() { |
| return !active.get() || super.isFull(); |
| } |
| |
| public void gc() { |
| } |
| |
| /** |
| * store will have a pending ack for all durables, irrespective of the selector |
| * so we need to ack if node is un-matched |
| */ |
| public void unmatched(MessageReference node) throws IOException { |
| MessageAck ack = new MessageAck(); |
| ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE); |
| ack.setMessageID(node.getMessageId()); |
| node.getRegionDestination().acknowledge(this.getContext(), this, ack, node); |
| } |
| |
| @Override |
| protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { |
| // statically configured via maxPageSize |
| } |
| |
| public void add(ConnectionContext context, Destination destination) throws Exception { |
| super.add(context, destination); |
| // do it just once per destination |
| if (destinations.containsKey(destination.getActiveMQDestination())) { |
| return; |
| } |
| destinations.put(destination.getActiveMQDestination(), destination); |
| |
| if (active.get() || keepDurableSubsActive) { |
| Topic topic = (Topic)destination; |
| topic.activate(context, this); |
| if (pending.isEmpty(topic)) { |
| topic.recoverRetroactiveMessages(context, this); |
| } |
| this.enqueueCounter+=pending.size(); |
| } else if (destination.getMessageStore() != null) { |
| TopicMessageStore store = (TopicMessageStore)destination.getMessageStore(); |
| try { |
| this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName()); |
| } catch (IOException e) { |
| JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e); |
| jmsEx.setLinkedException(e); |
| throw jmsEx; |
| } |
| } |
| dispatchPending(); |
| } |
| |
| public void activate(SystemUsage memoryManager, ConnectionContext context, |
| ConsumerInfo info) throws Exception { |
| if (!active.get()) { |
| this.context = context; |
| this.info = info; |
| LOG.debug("Activating " + this); |
| if (!keepDurableSubsActive) { |
| for (Iterator<Destination> iter = destinations.values() |
| .iterator(); iter.hasNext();) { |
| Topic topic = (Topic) iter.next(); |
| add(context, topic); |
| topic.activate(context, this); |
| } |
| } |
| synchronized (pendingLock) { |
| pending.setSystemUsage(memoryManager); |
| pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); |
| pending.setMaxAuditDepth(getMaxAuditDepth()); |
| pending.setMaxProducersToAudit(getMaxProducersToAudit()); |
| pending.start(); |
| // If nothing was in the persistent store, then try to use the |
| // recovery policy. |
| if (pending.isEmpty()) { |
| for (Iterator<Destination> iter = destinations.values() |
| .iterator(); iter.hasNext();) { |
| Topic topic = (Topic) iter.next(); |
| topic.recoverRetroactiveMessages(context, this); |
| } |
| } |
| } |
| this.active.set(true); |
| dispatchPending(); |
| this.usageManager.getMemoryUsage().addUsageListener(this); |
| } |
| } |
| |
| public void deactivate(boolean keepDurableSubsActive) throws Exception { |
| LOG.debug("Deactivating " + this); |
| active.set(false); |
| this.usageManager.getMemoryUsage().removeUsageListener(this); |
| synchronized (pending) { |
| pending.stop(); |
| } |
| if (!keepDurableSubsActive) { |
| for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { |
| Topic topic = (Topic)iter.next(); |
| topic.deactivate(context, this); |
| } |
| } |
| |
| for (final MessageReference node : dispatched) { |
| // Mark the dispatched messages as redelivered for next time. |
| Integer count = redeliveredMessages.get(node.getMessageId()); |
| if (count != null) { |
| redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); |
| } else { |
| redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); |
| } |
| if (keepDurableSubsActive&& pending.isTransient()) { |
| synchronized (pending) { |
| pending.addMessageFirst(node); |
| } |
| } else { |
| node.decrementReferenceCount(); |
| } |
| } |
| synchronized(dispatched) { |
| dispatched.clear(); |
| } |
| if (!keepDurableSubsActive && pending.isTransient()) { |
| synchronized (pending) { |
| try { |
| pending.reset(); |
| while (pending.hasNext()) { |
| MessageReference node = pending.next(); |
| node.decrementReferenceCount(); |
| pending.remove(); |
| } |
| } finally { |
| pending.release(); |
| } |
| } |
| } |
| prefetchExtension = 0; |
| } |
| |
| |
| protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { |
| MessageDispatch md = super.createMessageDispatch(node, message); |
| Integer count = redeliveredMessages.get(node.getMessageId()); |
| if (count != null) { |
| md.setRedeliveryCounter(count.intValue()); |
| } |
| return md; |
| } |
| |
| public void add(MessageReference node) throws Exception { |
| if (!active.get() && !keepDurableSubsActive) { |
| return; |
| } |
| super.add(node); |
| } |
| |
| protected void dispatchPending() throws IOException { |
| if (isActive()) { |
| super.dispatchPending(); |
| } |
| } |
| |
| protected void doAddRecoveredMessage(MessageReference message) throws Exception { |
| synchronized(pending) { |
| pending.addRecoveredMessage(message); |
| } |
| } |
| |
| public int getPendingQueueSize() { |
| if (active.get() || keepDurableSubsActive) { |
| return super.getPendingQueueSize(); |
| } |
| // TODO: need to get from store |
| return 0; |
| } |
| |
| public void setSelector(String selector) throws InvalidSelectorException { |
| throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); |
| } |
| |
| protected boolean canDispatch(MessageReference node) { |
| return isActive(); |
| } |
| |
| protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { |
| node.getRegionDestination().acknowledge(context, this, ack, node); |
| redeliveredMessages.remove(node.getMessageId()); |
| node.decrementReferenceCount(); |
| } |
| |
| |
| public synchronized String toString() { |
| return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending=" |
| + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension; |
| } |
| |
| public SubscriptionKey getSubscriptionKey() { |
| return subscriptionKey; |
| } |
| |
| /** |
| * Release any references that we are holding. |
| */ |
| public void destroy() { |
| synchronized (pending) { |
| try { |
| |
| pending.reset(); |
| while (pending.hasNext()) { |
| MessageReference node = pending.next(); |
| node.decrementReferenceCount(); |
| } |
| |
| } finally { |
| pending.release(); |
| pending.clear(); |
| } |
| } |
| synchronized(dispatched) { |
| for (Iterator iter = dispatched.iterator(); iter.hasNext();) { |
| MessageReference node = (MessageReference) iter.next(); |
| node.decrementReferenceCount(); |
| } |
| dispatched.clear(); |
| } |
| setSlowConsumer(false); |
| } |
| |
| public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { |
| if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { |
| try { |
| dispatchPending(); |
| } catch (IOException e) { |
| LOG.warn("problem calling dispatchMatched", e); |
| } |
| } |
| } |
| |
| protected boolean isDropped(MessageReference node) { |
| return false; |
| } |
| |
| public boolean isKeepDurableSubsActive() { |
| return keepDurableSubsActive; |
| } |
| } |