blob: 2fc7246090ea9d84e207f319d1361d2fe6304f65 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* persist pending messages pending message (messages awaiting dispatch to a
* consumer) cursor
*
*
*/
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class);
private static final int UNKNOWN = -1;
private final String clientId;
private final String subscriberName;
private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
private final PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
private final DurableTopicSubscription subscription;
private int cacheCurrentLowestPriority = UNKNOWN;
private boolean immediatePriorityDispatch = true;
/**
* @param broker Broker for this cursor
* @param clientId clientId for this cursor
* @param subscriberName subscriber name for this cursor
* @param maxBatchSize currently ignored
* @param subscription subscription for this cursor
*/
public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) {
super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
this.subscription=subscription;
this.clientId = clientId;
this.subscriberName = subscriberName;
if (broker.getBrokerService().isPersistent()) {
this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
} else {
this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
}
this.nonPersistent.setMaxBatchSize(maxBatchSize);
this.nonPersistent.setSystemUsage(systemUsage);
this.storePrefetches.add(this.nonPersistent);
if (prioritizedMessages) {
setMaxAuditDepth(10*getMaxAuditDepth());
}
}
@Override
public synchronized void start() throws Exception {
if (!isStarted()) {
super.start();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.setMessageAudit(getMessageAudit());
tsp.start();
}
}
}
@Override
public synchronized void stop() throws Exception {
if (isStarted()) {
if (subscription.isKeepDurableSubsActive()) {
super.gc();
super.getMessageAudit().clear();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
tsp.getMessageAudit().clear();
}
} else {
super.stop();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.stop();
}
}
}
}
/**
* Add a destination
*
* @param context
* @param destination
* @throws Exception
*/
@Override
public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
tsp.setMaxBatchSize(destination.getMaxPageSize());
tsp.setSystemUsage(systemUsage);
tsp.setMessageAudit(getMessageAudit());
tsp.setEnableAudit(isEnableAudit());
tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (isStarted()) {
tsp.start();
}
}
}
/**
* remove a destination
*
* @param context
* @param destination
* @throws Exception
*/
@Override
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
PendingMessageCursor tsp = topics.remove(destination);
if (tsp != null) {
storePrefetches.remove(tsp);
}
return Collections.EMPTY_LIST;
}
/**
* @return true if there are no pending messages
*/
@Override
public synchronized boolean isEmpty() {
for (PendingMessageCursor tsp : storePrefetches) {
if( !tsp.isEmpty() )
return false;
}
return true;
}
@Override
public synchronized boolean isEmpty(Destination destination) {
boolean result = true;
TopicStorePrefetch tsp = topics.get(destination);
if (tsp != null) {
result = tsp.isEmpty();
}
return result;
}
/**
* Informs the Broker if the subscription needs to intervention to recover
* it's state e.g. DurableTopicSubscriber may do
*
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
* @return true if recovery required
*/
@Override
public boolean isRecoveryRequired() {
return false;
}
@Override
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
if (isStarted()) {
if (!msg.isPersistent()) {
nonPersistent.addMessageLast(node);
}
}
if (msg.isPersistent()) {
Destination dest = msg.getRegionDestination();
TopicStorePrefetch tsp = topics.get(dest);
if (tsp != null) {
// cache can become high priority cache for immediate dispatch
final int priority = msg.getPriority();
if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
if (priority > tsp.getCurrentLowestPriority()) {
if (LOG.isTraceEnabled()) {
LOG.trace("enabling cache for cursor on high priority message " + priority
+ ", current lowest: " + tsp.getCurrentLowestPriority());
}
tsp.setCacheEnabled(true);
cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
}
} else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
// go to the store to get next priority message as lower priority messages may be recovered
// already and need to acked sequence order
if (LOG.isTraceEnabled()) {
LOG.trace("disabling/clearing cache for cursor on lower priority message "
+ priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
+ " cache lowest: " + cacheCurrentLowestPriority);
}
tsp.setCacheEnabled(false);
cacheCurrentLowestPriority = UNKNOWN;
}
tsp.addMessageLast(node);
}
}
}
}
@Override
public boolean isTransient() {
return subscription.isKeepDurableSubsActive();
}
@Override
public void addMessageFirst(MessageReference node) throws Exception {
// for keep durable subs active, need to deal with redispatch
if (node != null) {
Message msg = node.getMessage();
if (!msg.isPersistent()) {
nonPersistent.addMessageFirst(node);
} else {
Destination dest = msg.getRegionDestination();
TopicStorePrefetch tsp = topics.get(dest);
if (tsp != null) {
tsp.addMessageFirst(node);
}
}
}
}
@Override
public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
nonPersistent.addMessageLast(node);
}
@Override
public synchronized void clear() {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.clear();
}
}
@Override
public synchronized boolean hasNext() {
boolean result = true;
if (result) {
try {
currentCursor = getNextCursor();
} catch (Exception e) {
LOG.error("Failed to get current cursor ", e);
throw new RuntimeException(e);
}
result = currentCursor != null ? currentCursor.hasNext() : false;
}
return result;
}
@Override
public synchronized MessageReference next() {
MessageReference result = currentCursor != null ? currentCursor.next() : null;
return result;
}
@Override
public synchronized void remove() {
if (currentCursor != null) {
currentCursor.remove();
}
}
@Override
public synchronized void remove(MessageReference node) {
if (currentCursor != null) {
currentCursor.remove(node);
}
}
@Override
public synchronized void reset() {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.reset();
}
}
@Override
public synchronized void release() {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.release();
}
}
@Override
public synchronized int size() {
int pendingCount=0;
for (PendingMessageCursor tsp : storePrefetches) {
pendingCount += tsp.size();
}
return pendingCount;
}
@Override
public void setMaxBatchSize(int newMaxBatchSize) {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.setMaxBatchSize(newMaxBatchSize);
}
super.setMaxBatchSize(newMaxBatchSize);
}
@Override
public synchronized void gc() {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
}
cacheCurrentLowestPriority = UNKNOWN;
}
@Override
public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
for (PendingMessageCursor tsp : storePrefetches) {
tsp.setSystemUsage(usageManager);
}
}
@Override
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
}
}
@Override
public void setMaxProducersToAudit(int maxProducersToAudit) {
super.setMaxProducersToAudit(maxProducersToAudit);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setMaxAuditDepth(maxAuditDepth);
}
}
@Override
public void setMaxAuditDepth(int maxAuditDepth) {
super.setMaxAuditDepth(maxAuditDepth);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setMaxAuditDepth(maxAuditDepth);
}
}
@Override
public void setEnableAudit(boolean enableAudit) {
super.setEnableAudit(enableAudit);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setEnableAudit(enableAudit);
}
}
@Override
public void setUseCache(boolean useCache) {
super.setUseCache(useCache);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setUseCache(useCache);
}
}
protected synchronized PendingMessageCursor getNextCursor() throws Exception {
if (currentCursor == null || currentCursor.isEmpty()) {
currentCursor = null;
for (PendingMessageCursor tsp : storePrefetches) {
if (tsp.hasNext()) {
currentCursor = tsp;
break;
}
}
// round-robin
if (storePrefetches.size()>1) {
PendingMessageCursor first = storePrefetches.remove(0);
storePrefetches.add(first);
}
}
return currentCursor;
}
@Override
public String toString() {
return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
}
public boolean isImmediatePriorityDispatch() {
return immediatePriorityDispatch;
}
public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
this.immediatePriorityDispatch = immediatePriorityDispatch;
}
}