blob: 6f654a46229d2cf03d05be8f543241fbe6d59975 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.*;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class TopicSubscription extends AbstractSubscription {
private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
protected PendingMessageCursor matched;
protected final SystemUsage usageManager;
boolean singleDestination = true;
Destination destination;
private final Scheduler scheduler;
private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private final AtomicInteger discarded = new AtomicInteger();
private final Object matchedListMutex = new Object();
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
protected int maxProducersToAudit = 1024;
protected int maxAuditDepth = 1000;
protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit;
protected boolean active = false;
protected boolean discarding = false;
private boolean useTopicSubscriptionInflightStats = true;
//Used for inflight message size calculations
protected final Object dispatchLock = new Object();
protected final List<DispatchedNode> dispatched = new ArrayList<>();
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) {
this.matched = new VMPendingMessageCursor(false);
} else {
this.matched = new FilePendingMessageCursor(broker,matchedName,false);
}
this.scheduler = broker.getScheduler();
}
public void init() throws Exception {
this.matched.setSystemUsage(usageManager);
this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.matched.start();
if (enableAudit) {
audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
}
this.active=true;
}
@Override
public void add(MessageReference node) throws Exception {
if (isDuplicate(node)) {
return;
}
// Lets use an indirect reference so that we can associate a unique
// locator /w the message.
node = new IndirectMessageReference(node.getMessage());
getSubscriptionStatistics().getEnqueues().increment();
synchronized (matchedListMutex) {
// if this subscriber is already discarding a message, we don't want to add
// any more messages to it as those messages can only be advisories generated in the process,
// which can trigger the recursive call loop
if (discarding) return;
if (!isFull() && matched.isEmpty()) {
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
setSlowConsumer(false);
} else {
if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
// Slow consumers should log and set their state as such.
if (!isSlowConsumer()) {
LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
setSlowConsumer(true);
for (Destination dest: destinations) {
dest.slowConsumer(getContext(), this);
}
}
}
if (maximumPendingMessages != 0) {
boolean warnedAboutWait = false;
while (active) {
while (matched.isFull()) {
if (getContext().getStopping().get()) {
LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
getSubscriptionStatistics().getEnqueues().decrement();
return;
}
if (!warnedAboutWait) {
LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
new Object[]{
toString(),
matched,
matched.getSystemUsage().getTempUsage().getPercentUsage(),
matched.getSystemUsage().getMemoryUsage().getPercentUsage()
});
warnedAboutWait = true;
}
matchedListMutex.wait(20);
}
// Temporary storage could be full - so just try to add the message
// see https://issues.apache.org/activemq/browse/AMQ-2475
if (matched.tryAddMessageLast(node, 10)) {
break;
}
}
if (maximumPendingMessages > 0) {
// calculate the high water mark from which point we
// will eagerly evict expired messages
int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
max = maximumPendingMessages;
}
if (!matched.isEmpty() && matched.size() > max) {
removeExpiredMessages();
}
// lets discard old messages as we are a slow consumer
while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
int pageInSize = matched.size() - maximumPendingMessages;
// only page in a 1000 at a time - else we could blow the memory
pageInSize = Math.max(1000, pageInSize);
LinkedList<MessageReference> list = null;
MessageReference[] oldMessages=null;
synchronized(matched){
list = matched.pageInList(pageInSize);
oldMessages = messageEvictionStrategy.evictMessages(list);
for (MessageReference ref : list) {
ref.decrementReferenceCount();
}
}
int messagesToEvict = 0;
if (oldMessages != null){
messagesToEvict = oldMessages.length;
for (int i = 0; i < messagesToEvict; i++) {
MessageReference oldMessage = oldMessages[i];
discard(oldMessage);
}
}
// lets avoid an infinite loop if we are given a bad eviction strategy
// for a bad strategy lets just not evict
if (messagesToEvict == 0) {
LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{
destination, messageEvictionStrategy, list.size()
});
break;
}
}
}
dispatchMatched();
}
}
}
}
private boolean isDuplicate(MessageReference node) {
boolean duplicate = false;
if (enableAudit && audit != null) {
duplicate = audit.isDuplicate(node);
if (LOG.isDebugEnabled()) {
if (duplicate) {
LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId());
}
}
}
return duplicate;
}
/**
* Discard any expired messages from the matched list. Called from a
* synchronized block.
*
* @throws IOException
*/
protected void removeExpiredMessages() throws IOException {
try {
matched.reset();
while (matched.hasNext()) {
MessageReference node = matched.next();
node.decrementReferenceCount();
if (node.isExpired()) {
matched.remove();
node.decrementReferenceCount();
if (broker.isExpired(node)) {
((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
broker.messageExpired(getContext(), node, this);
}
break;
}
}
} finally {
matched.release();
}
}
@Override
public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
synchronized (matchedListMutex) {
try {
matched.reset();
while (matched.hasNext()) {
MessageReference node = matched.next();
node.decrementReferenceCount();
if (node.getMessageId().equals(mdn.getMessageId())) {
synchronized(dispatchLock) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
if (isUseTopicSubscriptionInflightStats()) {
dispatched.add(new DispatchedNode(node));
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
node.decrementReferenceCount();
}
break;
}
}
} finally {
matched.release();
}
}
}
@Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
super.acknowledge(context, ack);
if (ack.isStandardAck()) {
updateStatsOnAck(context, ack);
} else if (ack.isPoisonAck()) {
if (ack.isInTransaction()) {
throw new JMSException("Poison ack cannot be transacted: " + ack);
}
updateStatsOnAck(context, ack);
contractPrefetchExtension(ack.getMessageCount());
} else if (ack.isIndividualAck()) {
updateStatsOnAck(context, ack);
if (ack.isInTransaction()) {
expandPrefetchExtension(1);
}
} else if (ack.isExpiredAck()) {
updateStatsOnAck(ack);
contractPrefetchExtension(ack.getMessageCount());
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch counters.
expandPrefetchExtension(ack.getMessageCount());
} else if (ack.isRedeliveredAck()) {
// No processing for redelivered needed
return;
} else {
throw new JMSException("Invalid acknowledgment: " + ack);
}
dispatchMatched();
}
private void updateStatsOnAck(final ConnectionContext context, final MessageAck ack) {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterRollback() {
contractPrefetchExtension(ack.getMessageCount());
}
@Override
public void afterCommit() throws Exception {
contractPrefetchExtension(ack.getMessageCount());
updateStatsOnAck(ack);
dispatchMatched();
}
});
} else {
updateStatsOnAck(ack);
}
}
@Override
public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
// The slave should not deliver pull messages.
if (getPrefetchSize() == 0) {
final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount();
prefetchExtension.set(pull.getQuantity());
dispatchMatched();
// If there was nothing dispatched.. we may need to setup a timeout.
if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
// Send a NULL message to signal nothing pending.
dispatch(null);
prefetchExtension.set(0);
}
if (pull.getTimeout() > 0) {
scheduler.executeAfterDelay(new Runnable() {
@Override
public void run() {
pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone());
}
}, pull.getTimeout());
}
}
}
return null;
}
/**
* Occurs when a pull times out. If nothing has been dispatched since the
* timeout was setup, then send the NULL message.
*/
private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
synchronized (matchedListMutex) {
if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) {
try {
dispatch(null);
} catch (Exception e) {
context.getConnection().serviceException(e);
} finally {
prefetchExtension.set(0);
}
}
}
}
/**
* Update the statistics on message ack.
* @param ack
*/
private void updateStatsOnAck(final MessageAck ack) {
//Allow disabling inflight stats to save memory usage
if (isUseTopicSubscriptionInflightStats()) {
synchronized(dispatchLock) {
boolean inAckRange = false;
List<DispatchedNode> removeList = new ArrayList<>();
for (final DispatchedNode node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
removeList.add(node);
if (ack.getLastMessageId().equals(messageId)) {
break;
}
}
}
for (final DispatchedNode node : removeList) {
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
final Destination destination = node.getDestination();
incrementStatsOnAck(destination, ack, 1);
if (!ack.isInTransaction()) {
contractPrefetchExtension(1);
}
}
}
} else {
if (singleDestination && destination != null) {
incrementStatsOnAck(destination, ack, ack.getMessageCount());
}
if (!ack.isInTransaction()) {
contractPrefetchExtension(ack.getMessageCount());
}
}
}
private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
getSubscriptionStatistics().getDequeues().add(count);
destination.getDestinationStatistics().getDequeues().add(count);
destination.getDestinationStatistics().getInflight().subtract(count);
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(count);
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
}
}
@Override
public int countBeforeFull() {
return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
}
@Override
public int getPendingQueueSize() {
return matched();
}
@Override
public long getPendingMessageSize() {
return matched.messageSize();
}
@Override
public int getDispatchedQueueSize() {
return (int)(getSubscriptionStatistics().getDispatched().getCount() -
getSubscriptionStatistics().getDequeues().getCount());
}
public int getMaximumPendingMessages() {
return maximumPendingMessages;
}
@Override
public long getDispatchedCounter() {
return getSubscriptionStatistics().getDispatched().getCount();
}
@Override
public long getEnqueueCounter() {
return getSubscriptionStatistics().getEnqueues().getCount();
}
@Override
public long getDequeueCounter() {
return getSubscriptionStatistics().getDequeues().getCount();
}
/**
* @return the number of messages discarded due to being a slow consumer
*/
public int discarded() {
return discarded.get();
}
/**
* @return the number of matched messages (messages targeted for the
* subscription but not yet able to be dispatched due to the
* prefetch buffer being full).
*/
public int matched() {
return matched.size();
}
/**
* Sets the maximum number of pending messages that can be matched against
* this consumer before old messages are discarded.
*/
public void setMaximumPendingMessages(int maximumPendingMessages) {
this.maximumPendingMessages = maximumPendingMessages;
}
public MessageEvictionStrategy getMessageEvictionStrategy() {
return messageEvictionStrategy;
}
/**
* Sets the eviction strategy used to decide which message to evict when the
* slow consumer needs to discard messages
*/
public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
this.messageEvictionStrategy = messageEvictionStrategy;
}
public synchronized int getMaxProducersToAudit() {
return maxProducersToAudit;
}
public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
if (audit != null) {
audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
}
}
public synchronized int getMaxAuditDepth() {
return maxAuditDepth;
}
public synchronized void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
if (audit != null) {
audit.setAuditDepth(maxAuditDepth);
}
}
public boolean isEnableAudit() {
return enableAudit;
}
public synchronized void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
if (enableAudit && audit == null) {
audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
}
}
// Implementation methods
// -------------------------------------------------------------------------
@Override
public boolean isFull() {
return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize();
}
@Override
public int getInFlightSize() {
return getDispatchedQueueSize();
}
/**
* @return true when 60% or more room is left for dispatching messages
*/
@Override
public boolean isLowWaterMark() {
return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
@Override
public boolean isHighWaterMark() {
return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
}
/**
* @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
*/
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
}
/**
* @return the memoryUsageHighWaterMark
*/
public int getMemoryUsageHighWaterMark() {
return this.memoryUsageHighWaterMark;
}
/**
* @return the usageManager
*/
public SystemUsage getUsageManager() {
return this.usageManager;
}
/**
* @return the matched
*/
public PendingMessageCursor getMatched() {
return this.matched;
}
/**
* @param matched the matched to set
*/
public void setMatched(PendingMessageCursor matched) {
this.matched = matched;
}
/**
* inform the MessageConsumer on the client to change it's prefetch
*
* @param newPrefetch
*/
@Override
public void updateConsumerPrefetch(int newPrefetch) {
if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
ConsumerControl cc = new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
cc.setPrefetch(newPrefetch);
context.getConnection().dispatchAsync(cc);
}
}
private void dispatchMatched() throws IOException {
synchronized (matchedListMutex) {
if (!matched.isEmpty() && !isFull()) {
try {
matched.reset();
while (matched.hasNext() && !isFull()) {
MessageReference message = matched.next();
message.decrementReferenceCount();
matched.remove();
// Message may have been sitting in the matched list a while
// waiting for the consumer to ak the message.
if (message.isExpired()) {
discard(message);
continue; // just drop it.
}
dispatch(message);
}
} finally {
matched.release();
}
}
}
}
private void dispatch(final MessageReference node) throws IOException {
Message message = node != null ? node.getMessage() : null;
if (node != null) {
node.incrementReferenceCount();
}
// Make sure we can dispatch a message.
MessageDispatch md = new MessageDispatch();
md.setMessage(message);
md.setConsumerId(info.getConsumerId());
if (node != null) {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
synchronized(dispatchLock) {
getSubscriptionStatistics().getDispatched().increment();
if (isUseTopicSubscriptionInflightStats()) {
dispatched.add(new DispatchedNode(node));
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
}
// Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
if (destination == null) {
destination = (Destination)node.getRegionDestination();
} else {
if (destination != node.getRegionDestination()) {
singleDestination = false;
}
}
}
if (getPrefetchSize() == 0) {
decrementPrefetchExtension(1);
}
}
if (info.isDispatchAsync()) {
if (node != null) {
md.setTransmitCallback(new TransmitCallback() {
@Override
public void onSuccess() {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
}
@Override
public void onFailure() {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
}
});
}
context.getConnection().dispatchAsync(md);
} else {
context.getConnection().dispatchSync(md);
if (node != null) {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
}
}
}
private void discard(MessageReference message) {
discarding = true;
try {
message.decrementReferenceCount();
matched.remove(message);
discarded.incrementAndGet();
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
}
LOG.debug("{}, discarding message {}", this, message);
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {
dest.messageDiscarded(getContext(), this, message);
}
broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
} finally {
discarding = false;
}
}
@Override
public String toString() {
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get()
+ ", usePrefetchExtension=" + isUsePrefetchExtension();
}
@Override
public void destroy() {
this.active=false;
synchronized (matchedListMutex) {
try {
matched.destroy();
} catch (Exception e) {
LOG.warn("Failed to destroy cursor", e);
}
}
setSlowConsumer(false);
synchronized(dispatchLock) {
dispatched.clear();
}
}
@Override
public int getPrefetchSize() {
return info.getPrefetchSize();
}
@Override
public void setPrefetchSize(int newSize) {
info.setPrefetchSize(newSize);
try {
dispatchMatched();
} catch(Exception e) {
LOG.trace("Caught exception on dispatch after prefetch size change.");
}
}
public boolean isUseTopicSubscriptionInflightStats() {
return useTopicSubscriptionInflightStats;
}
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}
private static class DispatchedNode {
private final int size;
private final MessageId messageId;
private final Destination destination;
public DispatchedNode(final MessageReference node) {
super();
this.size = node.getSize();
this.messageId = node.getMessageId();
this.destination = node.getRegionDestination() instanceof Destination ?
((Destination)node.getRegionDestination()) : null;
}
public long getSize() {
return size;
}
public MessageId getMessageId() {
return messageId;
}
public Destination getDestination() {
return destination;
}
}
}