| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.broker.region; |
| |
| import java.io.IOException; |
| import java.util.LinkedList; |
| import java.util.concurrent.atomic.AtomicLong; |
| import javax.jms.JMSException; |
| import org.apache.activemq.ActiveMQMessageAudit; |
| import org.apache.activemq.broker.Broker; |
| import org.apache.activemq.broker.ConnectionContext; |
| import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; |
| import org.apache.activemq.broker.region.cursors.PendingMessageCursor; |
| import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; |
| import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; |
| import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; |
| import org.apache.activemq.command.ConsumerControl; |
| import org.apache.activemq.command.ConsumerInfo; |
| import org.apache.activemq.command.Message; |
| import org.apache.activemq.command.MessageAck; |
| import org.apache.activemq.command.MessageDispatch; |
| import org.apache.activemq.command.MessageDispatchNotification; |
| import org.apache.activemq.command.MessagePull; |
| import org.apache.activemq.command.Response; |
| import org.apache.activemq.transaction.Synchronization; |
| import org.apache.activemq.usage.SystemUsage; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TopicSubscription extends AbstractSubscription { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); |
| private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); |
| |
| protected PendingMessageCursor matched; |
| protected final SystemUsage usageManager; |
| protected AtomicLong dispatchedCounter = new AtomicLong(); |
| |
| boolean singleDestination = true; |
| Destination destination; |
| |
| private int maximumPendingMessages = -1; |
| private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); |
| private int discarded; |
| private final Object matchedListMutex = new Object(); |
| private final AtomicLong enqueueCounter = new AtomicLong(0); |
| private final AtomicLong dequeueCounter = new AtomicLong(0); |
| private int memoryUsageHighWaterMark = 95; |
| // allow duplicate suppression in a ring network of brokers |
| protected int maxProducersToAudit = 1024; |
| protected int maxAuditDepth = 1000; |
| protected boolean enableAudit = false; |
| protected ActiveMQMessageAudit audit; |
| protected boolean active = false; |
| |
| public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { |
| super(broker, context, info); |
| this.usageManager = usageManager; |
| String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; |
| if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) { |
| this.matched = new VMPendingMessageCursor(false); |
| } else { |
| this.matched = new FilePendingMessageCursor(broker,matchedName,false); |
| } |
| } |
| |
| public void init() throws Exception { |
| this.matched.setSystemUsage(usageManager); |
| this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); |
| this.matched.start(); |
| if (enableAudit) { |
| audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit); |
| } |
| this.active=true; |
| } |
| |
| public void add(MessageReference node) throws Exception { |
| if (isDuplicate(node)) { |
| return; |
| } |
| enqueueCounter.incrementAndGet(); |
| if (!isFull() && matched.isEmpty() && !isSlave()) { |
| // if maximumPendingMessages is set we will only discard messages which |
| // have not been dispatched (i.e. we allow the prefetch buffer to be filled) |
| dispatch(node); |
| setSlowConsumer(false); |
| } else { |
| //we are slow |
| if(!isSlowConsumer()) { |
| setSlowConsumer(true); |
| for (Destination dest: destinations) { |
| dest.slowConsumer(getContext(), this); |
| } |
| } |
| if (maximumPendingMessages != 0) { |
| boolean warnedAboutWait = false; |
| while (active) { |
| synchronized (matchedListMutex) { |
| while (matched.isFull()) { |
| if (getContext().getStopping().get()) { |
| LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: " |
| + node.getMessageId()); |
| enqueueCounter.decrementAndGet(); |
| return; |
| } |
| if (!warnedAboutWait) { |
| LOG.info(toString() + ": Pending message cursor [" + matched |
| + "] is full, temp usage (" |
| + +matched.getSystemUsage().getTempUsage().getPercentUsage() |
| + "%) or memory usage (" |
| + matched.getSystemUsage().getMemoryUsage().getPercentUsage() |
| + "%) limit reached, blocking message add() pending the release of resources."); |
| warnedAboutWait = true; |
| } |
| matchedListMutex.wait(20); |
| } |
| //Temporary storage could be full - so just try to add the message |
| //see https://issues.apache.org/activemq/browse/AMQ-2475 |
| if (matched.tryAddMessageLast(node, 10)) { |
| break; |
| } |
| } |
| } |
| synchronized (matchedListMutex) { |
| |
| // NOTE - be careful about the slaveBroker! |
| if (maximumPendingMessages > 0) { |
| // calculate the high water mark from which point we |
| // will eagerly evict expired messages |
| int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); |
| if (maximumPendingMessages > 0 && maximumPendingMessages < max) { |
| max = maximumPendingMessages; |
| } |
| if (!matched.isEmpty() && matched.size() > max) { |
| removeExpiredMessages(); |
| } |
| // lets discard old messages as we are a slow consumer |
| while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { |
| int pageInSize = matched.size() - maximumPendingMessages; |
| // only page in a 1000 at a time - else we could |
| // blow da memory |
| pageInSize = Math.max(1000, pageInSize); |
| LinkedList<MessageReference> list = null; |
| MessageReference[] oldMessages=null; |
| synchronized(matched){ |
| list = matched.pageInList(pageInSize); |
| oldMessages = messageEvictionStrategy.evictMessages(list); |
| for (MessageReference ref : list) { |
| ref.decrementReferenceCount(); |
| } |
| } |
| int messagesToEvict = 0; |
| if (oldMessages != null){ |
| messagesToEvict = oldMessages.length; |
| for (int i = 0; i < messagesToEvict; i++) { |
| MessageReference oldMessage = oldMessages[i]; |
| discard(oldMessage); |
| } |
| } |
| // lets avoid an infinite loop if we are given a bad |
| // eviction strategy |
| // for a bad strategy lets just not evict |
| if (messagesToEvict == 0) { |
| LOG.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy); |
| break; |
| } |
| } |
| } |
| } |
| dispatchMatched(); |
| } |
| } |
| } |
| |
| private boolean isDuplicate(MessageReference node) { |
| boolean duplicate = false; |
| if (enableAudit && audit != null) { |
| duplicate = audit.isDuplicate(node); |
| if (LOG.isDebugEnabled()) { |
| if (duplicate) { |
| LOG.debug("ignoring duplicate add: " + node.getMessageId()); |
| } |
| } |
| } |
| return duplicate; |
| } |
| |
| /** |
| * Discard any expired messages from the matched list. Called from a |
| * synchronized block. |
| * |
| * @throws IOException |
| */ |
| protected void removeExpiredMessages() throws IOException { |
| try { |
| matched.reset(); |
| while (matched.hasNext()) { |
| MessageReference node = matched.next(); |
| node.decrementReferenceCount(); |
| if (broker.isExpired(node)) { |
| matched.remove(); |
| dispatchedCounter.incrementAndGet(); |
| node.decrementReferenceCount(); |
| node.getRegionDestination().getDestinationStatistics().getExpired().increment(); |
| broker.messageExpired(getContext(), node, this); |
| break; |
| } |
| } |
| } finally { |
| matched.release(); |
| } |
| } |
| |
| public void processMessageDispatchNotification(MessageDispatchNotification mdn) { |
| synchronized (matchedListMutex) { |
| try { |
| matched.reset(); |
| while (matched.hasNext()) { |
| MessageReference node = matched.next(); |
| node.decrementReferenceCount(); |
| if (node.getMessageId().equals(mdn.getMessageId())) { |
| matched.remove(); |
| dispatchedCounter.incrementAndGet(); |
| node.decrementReferenceCount(); |
| break; |
| } |
| } |
| } finally { |
| matched.release(); |
| } |
| } |
| } |
| |
| public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { |
| // Handle the standard acknowledgment case. |
| if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { |
| if (context.isInTransaction()) { |
| context.getTransaction().addSynchronization(new Synchronization() { |
| |
| @Override |
| public void afterCommit() throws Exception { |
| synchronized (TopicSubscription.this) { |
| if (singleDestination && destination != null) { |
| destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); |
| } |
| } |
| dequeueCounter.addAndGet(ack.getMessageCount()); |
| dispatchMatched(); |
| } |
| }); |
| } else { |
| if (singleDestination && destination != null) { |
| destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); |
| destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); |
| } |
| dequeueCounter.addAndGet(ack.getMessageCount()); |
| } |
| dispatchMatched(); |
| return; |
| } else if (ack.isDeliveredAck()) { |
| // Message was delivered but not acknowledged: update pre-fetch |
| // counters. |
| // also. get these for a consumer expired message. |
| if (destination != null && !ack.isInTransaction()) { |
| destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); |
| destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); |
| } |
| dequeueCounter.addAndGet(ack.getMessageCount()); |
| dispatchMatched(); |
| return; |
| } else if (ack.isRedeliveredAck()) { |
| // nothing to do atm |
| return; |
| } |
| throw new JMSException("Invalid acknowledgment: " + ack); |
| } |
| |
| public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { |
| // not supported for topics |
| return null; |
| } |
| |
| public int getPendingQueueSize() { |
| return matched(); |
| } |
| |
| public int getDispatchedQueueSize() { |
| return (int)(dispatchedCounter.get() - dequeueCounter.get()); |
| } |
| |
| public int getMaximumPendingMessages() { |
| return maximumPendingMessages; |
| } |
| |
| public long getDispatchedCounter() { |
| return dispatchedCounter.get(); |
| } |
| |
| public long getEnqueueCounter() { |
| return enqueueCounter.get(); |
| } |
| |
| public long getDequeueCounter() { |
| return dequeueCounter.get(); |
| } |
| |
| /** |
| * @return the number of messages discarded due to being a slow consumer |
| */ |
| public int discarded() { |
| synchronized (matchedListMutex) { |
| return discarded; |
| } |
| } |
| |
| /** |
| * @return the number of matched messages (messages targeted for the |
| * subscription but not yet able to be dispatched due to the |
| * prefetch buffer being full). |
| */ |
| public int matched() { |
| synchronized (matchedListMutex) { |
| return matched.size(); |
| } |
| } |
| |
| /** |
| * Sets the maximum number of pending messages that can be matched against |
| * this consumer before old messages are discarded. |
| */ |
| public void setMaximumPendingMessages(int maximumPendingMessages) { |
| this.maximumPendingMessages = maximumPendingMessages; |
| } |
| |
| public MessageEvictionStrategy getMessageEvictionStrategy() { |
| return messageEvictionStrategy; |
| } |
| |
| /** |
| * Sets the eviction strategy used to decide which message to evict when the |
| * slow consumer needs to discard messages |
| */ |
| public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { |
| this.messageEvictionStrategy = messageEvictionStrategy; |
| } |
| |
| public int getMaxProducersToAudit() { |
| return maxProducersToAudit; |
| } |
| |
| public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { |
| this.maxProducersToAudit = maxProducersToAudit; |
| if (audit != null) { |
| audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); |
| } |
| } |
| |
| public int getMaxAuditDepth() { |
| return maxAuditDepth; |
| } |
| |
| public synchronized void setMaxAuditDepth(int maxAuditDepth) { |
| this.maxAuditDepth = maxAuditDepth; |
| if (audit != null) { |
| audit.setAuditDepth(maxAuditDepth); |
| } |
| } |
| |
| public boolean isEnableAudit() { |
| return enableAudit; |
| } |
| |
| public synchronized void setEnableAudit(boolean enableAudit) { |
| this.enableAudit = enableAudit; |
| if (enableAudit && audit==null) { |
| audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); |
| } |
| } |
| |
| // Implementation methods |
| // ------------------------------------------------------------------------- |
| public boolean isFull() { |
| return getDispatchedQueueSize() >= info.getPrefetchSize(); |
| } |
| |
| public int getInFlightSize() { |
| return getDispatchedQueueSize(); |
| } |
| |
| |
| /** |
| * @return true when 60% or more room is left for dispatching messages |
| */ |
| public boolean isLowWaterMark() { |
| return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); |
| } |
| |
| /** |
| * @return true when 10% or less room is left for dispatching messages |
| */ |
| public boolean isHighWaterMark() { |
| return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); |
| } |
| |
| /** |
| * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set |
| */ |
| public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { |
| this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; |
| } |
| |
| /** |
| * @return the memoryUsageHighWaterMark |
| */ |
| public int getMemoryUsageHighWaterMark() { |
| return this.memoryUsageHighWaterMark; |
| } |
| |
| /** |
| * @return the usageManager |
| */ |
| public SystemUsage getUsageManager() { |
| return this.usageManager; |
| } |
| |
| /** |
| * @return the matched |
| */ |
| public PendingMessageCursor getMatched() { |
| return this.matched; |
| } |
| |
| /** |
| * @param matched the matched to set |
| */ |
| public void setMatched(PendingMessageCursor matched) { |
| this.matched = matched; |
| } |
| |
| /** |
| * inform the MessageConsumer on the client to change it's prefetch |
| * |
| * @param newPrefetch |
| */ |
| public void updateConsumerPrefetch(int newPrefetch) { |
| if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { |
| ConsumerControl cc = new ConsumerControl(); |
| cc.setConsumerId(info.getConsumerId()); |
| cc.setPrefetch(newPrefetch); |
| context.getConnection().dispatchAsync(cc); |
| } |
| } |
| |
| private void dispatchMatched() throws IOException { |
| synchronized (matchedListMutex) { |
| if (!matched.isEmpty() && !isFull()) { |
| try { |
| matched.reset(); |
| |
| while (matched.hasNext() && !isFull()) { |
| MessageReference message = matched.next(); |
| message.decrementReferenceCount(); |
| matched.remove(); |
| // Message may have been sitting in the matched list a |
| // while |
| // waiting for the consumer to ak the message. |
| if (message.isExpired()) { |
| discard(message); |
| continue; // just drop it. |
| } |
| dispatch(message); |
| } |
| } finally { |
| matched.release(); |
| } |
| } |
| } |
| } |
| |
| private void dispatch(final MessageReference node) throws IOException { |
| Message message = (Message)node; |
| node.incrementReferenceCount(); |
| // Make sure we can dispatch a message. |
| MessageDispatch md = new MessageDispatch(); |
| md.setMessage(message); |
| md.setConsumerId(info.getConsumerId()); |
| md.setDestination(node.getRegionDestination().getActiveMQDestination()); |
| dispatchedCounter.incrementAndGet(); |
| // Keep track if this subscription is receiving messages from a single |
| // destination. |
| if (singleDestination) { |
| if (destination == null) { |
| destination = node.getRegionDestination(); |
| } else { |
| if (destination != node.getRegionDestination()) { |
| singleDestination = false; |
| } |
| } |
| } |
| if (info.isDispatchAsync()) { |
| md.setTransmitCallback(new Runnable() { |
| |
| public void run() { |
| node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); |
| node.getRegionDestination().getDestinationStatistics().getInflight().increment(); |
| node.decrementReferenceCount(); |
| } |
| }); |
| context.getConnection().dispatchAsync(md); |
| } else { |
| context.getConnection().dispatchSync(md); |
| node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); |
| node.getRegionDestination().getDestinationStatistics().getInflight().increment(); |
| node.decrementReferenceCount(); |
| } |
| } |
| |
| private void discard(MessageReference message) { |
| message.decrementReferenceCount(); |
| matched.remove(message); |
| discarded++; |
| if(destination != null) { |
| destination.getDestinationStatistics().getDequeues().increment(); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Discarding message " + message); |
| } |
| Destination dest = message.getRegionDestination(); |
| if (dest != null) { |
| dest.messageDiscarded(getContext(), this, message); |
| } |
| broker.getRoot().sendToDeadLetterQueue(getContext(), message, this); |
| } |
| |
| @Override |
| public String toString() { |
| return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" |
| + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); |
| } |
| |
| public void destroy() { |
| this.active=false; |
| synchronized (matchedListMutex) { |
| try { |
| matched.destroy(); |
| } catch (Exception e) { |
| LOG.warn("Failed to destroy cursor", e); |
| } |
| } |
| setSlowConsumer(false); |
| } |
| |
| @Override |
| public int getPrefetchSize() { |
| return info.getPrefetchSize(); |
| } |
| |
| } |