blob: 87df663df171d3dc75a672f706bd0f81ba1fef2c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.NoLocalSubscriptionAware;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
/**
* The Topic is a destination that sends a copy of a message to every active
* Subscription registered.
*/
public class Topic extends BaseDestination implements Task {
protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
private final TopicMessageStore topicStore;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@Override
public void run() {
try {
Topic.this.taskRunner.wakeup();
} catch (InterruptedException e) {
}
}
};
public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
this.topicStore = store;
subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
@Override
public void initialize() throws Exception {
super.initialize();
// set non default subscription recovery policy (override policyEntries)
if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
setAlwaysRetroactive(true);
}
if (store != null) {
// AMQ-2586: Better to leave this stat at zero than to give the user
// misleading metrics.
// int messageCount = store.getMessageCount();
// destinationStatistics.getMessages().setCount(messageCount);
store.start();
}
}
@Override
public List<Subscription> getConsumers() {
synchronized (consumers) {
return new ArrayList<Subscription>(consumers);
}
}
public boolean lock(MessageReference node, LockOwner sub) {
return true;
}
@Override
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
// Do a retroactive recovery if needed.
if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
// synchronize with dispatch method so that no new messages are sent
// while we are recovering a subscription to avoid out of order messages.
dispatchLock.writeLock().lock();
try {
boolean applyRecovery = false;
synchronized (consumers) {
if (!consumers.contains(sub)){
sub.add(context, this);
consumers.add(sub);
applyRecovery=true;
super.addSubscription(context, sub);
}
}
if (applyRecovery){
subscriptionRecoveryPolicy.recover(context, this, sub);
}
} finally {
dispatchLock.writeLock().unlock();
}
} else {
synchronized (consumers) {
if (!consumers.contains(sub)){
sub.add(context, this);
consumers.add(sub);
super.addSubscription(context, sub);
}
}
}
} else {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
super.addSubscription(context, sub);
sub.add(context, this);
if(dsub.isActive()) {
synchronized (consumers) {
boolean hasSubscription = false;
if (consumers.size() == 0) {
hasSubscription = false;
} else {
for (Subscription currentSub : consumers) {
if (currentSub.getConsumerInfo().isDurable()) {
DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
hasSubscription = true;
break;
}
}
}
}
if (!hasSubscription) {
consumers.add(sub);
}
}
}
durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
}
}
@Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
boolean removed = false;
synchronized (consumers) {
removed = consumers.remove(sub);
}
if (removed) {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
}
}
sub.remove(context, this);
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (topicStore != null) {
topicStore.deleteSubscription(key.clientId, key.subscriptionName);
DurableTopicSubscription removed = durableSubscribers.remove(key);
if (removed != null) {
destinationStatistics.getConsumers().decrement();
// deactivate and remove
removed.deactivate(false, 0l);
consumers.remove(removed);
}
}
}
private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
if (hasSelectorChanged(info1, info2)) {
return true;
}
return hasNoLocalChanged(info1, info2);
}
private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
//Not all persistence adapters store the noLocal value for a subscription
PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter();
if (adapter instanceof NoLocalSubscriptionAware) {
if (info1.isNoLocal() ^ info2.isNoLocal()) {
return true;
}
}
return false;
}
private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) {
if (info1.getSelector() != null ^ info2.getSelector() != null) {
return true;
}
if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
return true;
}
return false;
}
public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
// synchronize with dispatch method so that no new messages are sent
// while we are recovering a subscription to avoid out of order messages.
dispatchLock.writeLock().lock();
try {
if (topicStore == null) {
return;
}
// Recover the durable subscription.
String clientId = subscription.getSubscriptionKey().getClientId();
String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
if (info != null) {
// Check to see if selector changed.
if (hasDurableSubChanged(info, subscription.getConsumerInfo())) {
// Need to delete the subscription
topicStore.deleteSubscription(clientId, subscriptionName);
info = null;
// Force a rebuild of the selector chain for the subscription otherwise
// the stored subscription is updated but the selector expression is not
// and the subscription will not behave according to the new configuration.
subscription.setSelector(subscription.getConsumerInfo().getSelector());
synchronized (consumers) {
consumers.remove(subscription);
}
} else {
synchronized (consumers) {
if (!consumers.contains(subscription)) {
consumers.add(subscription);
}
}
}
}
// Do we need to create the subscription?
if (info == null) {
info = new SubscriptionInfo();
info.setClientId(clientId);
info.setSelector(subscription.getConsumerInfo().getSelector());
info.setSubscriptionName(subscriptionName);
info.setDestination(getActiveMQDestination());
info.setNoLocal(subscription.getConsumerInfo().isNoLocal());
// This destination is an actual destination id.
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
// This destination might be a pattern
synchronized (consumers) {
consumers.add(subscription);
topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
}
}
final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
if (subscription.isRecoveryRequired()) {
topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
@Override
public boolean recoverMessage(Message message) throws Exception {
message.setRegionDestination(Topic.this);
try {
msgContext.setMessageReference(message);
if (subscription.matches(message, msgContext)) {
subscription.add(message);
}
} catch (IOException e) {
LOG.error("Failed to recover this message {}", message, e);
}
return true;
}
@Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
throw new RuntimeException("Should not be called.");
}
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean isDuplicate(MessageId id) {
return false;
}
});
}
} finally {
dispatchLock.writeLock().unlock();
}
}
public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
synchronized (consumers) {
consumers.remove(sub);
}
sub.remove(context, this, dispatched);
}
public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
if (subscription.getConsumerInfo().isRetroactive()) {
subscriptionRecoveryPolicy.recover(context, this, subscription);
}
}
@Override
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
producerExchange.incrementSend();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
message.setRegionDestination(this);
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message, null);
getDestinationStatistics().getExpired().increment();
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
if (memoryUsage.isFull()) {
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (isFlowControlLogRequired()) {
LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
}
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
+ memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
// We can avoid blocking due to low usage if the producer is sending a sync message or
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
@Override
public void run() {
try {
// While waiting for space to free up...
// the transaction may be done
if (message.isInTransaction()) {
if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
throw new JMSException("Send transaction completed while waiting for space");
}
}
// the message may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message, null);
getDestinationStatistics().getExpired().increment();
} else {
doMessageSend(producerExchange, message);
}
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
.getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
} catch (Exception e) {
if (!sendProducerAck && !context.isInRecoveryMode()) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
}
}
});
registerCallbackForNotFullNotification();
context.setDontSendReponse(true);
return;
}
} else {
// Producer flow control cannot be used, so we have do the flow control
// at the broker by blocking this thread until there is space available.
if (memoryUsage.isFull()) {
if (context.isInTransaction()) {
int count = 0;
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
if (count > 2 && context.isInTransaction()) {
count = 0;
int size = context.getTransaction().size();
LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
}
count++;
}
} else {
waitForSpace(
context,
producerExchange,
memoryUsage,
"Usage Manager Memory Usage limit reached. Stopping producer ("
+ message.getProducerId()
+ ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName()
+ "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
}
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
getDestinationStatistics().getExpired().increment();
LOG.debug("Expired message: {}", message);
return;
}
}
}
}
doMessageSend(producerExchange, message);
messageDelivered(context, message);
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
}
/**
* do send the message - this needs to be synchronized to ensure messages
* are stored AND dispatched in the right order
*
* @param producerExchange
* @param message
* @throws IOException
* @throws Exception
*/
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
Future<Object> result = null;
if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
}
waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
//Moved the reduceMemoryfootprint clearing to the dispatch method
}
message.incrementReferenceCount();
if (context.isInTransaction() && (context.getTransaction() != null)) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
// It could take while before we receive the commit
// operation.. by that time the message could have
// expired..
if (message.isExpired()) {
if (broker.isExpired(message)) {
getDestinationStatistics().getExpired().increment();
broker.messageExpired(context, message, null);
}
message.decrementReferenceCount();
return;
}
try {
dispatch(context, message);
} finally {
message.decrementReferenceCount();
}
}
@Override
public void afterRollback() throws Exception {
message.decrementReferenceCount();
}
});
} else {
try {
dispatch(context, message);
} finally {
message.decrementReferenceCount();
}
}
if (result != null && !result.isCancelled()) {
try {
result.get();
} catch (CancellationException e) {
// ignore - the task has been cancelled if the message
// has already been deleted
}
}
}
private boolean canOptimizeOutPersistence() {
return durableSubscribers.size() == 0;
}
@Override
public String toString() {
return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
}
@Override
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
convertToNonRangedAck(ack, node));
}
messageConsumed(context, node);
}
@Override
public void gc() {
}
public Message loadMessage(MessageId messageId) throws IOException {
return topicStore != null ? topicStore.getMessage(messageId) : null;
}
@Override
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
this.subscriptionRecoveryPolicy.start();
if (memoryUsage != null) {
memoryUsage.start();
}
if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
}
}
@Override
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
if (taskRunner != null) {
taskRunner.shutdown();
}
this.subscriptionRecoveryPolicy.stop();
if (memoryUsage != null) {
memoryUsage.stop();
}
if (this.topicStore != null) {
this.topicStore.stop();
}
scheduler.cancel(expireMessagesTask);
}
}
@Override
public Message[] browse() {
final List<Message> result = new ArrayList<Message>();
doBrowse(result, getMaxBrowsePageSize());
return result.toArray(new Message[result.size()]);
}
private void doBrowse(final List<Message> browseList, final int max) {
try {
if (topicStore != null) {
final List<Message> toExpire = new ArrayList<Message>();
topicStore.recover(new MessageRecoveryListener() {
@Override
public boolean recoverMessage(Message message) throws Exception {
if (message.isExpired()) {
toExpire.add(message);
}
browseList.add(message);
return true;
}
@Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
return true;
}
@Override
public boolean hasSpace() {
return browseList.size() < max;
}
@Override
public boolean isDuplicate(MessageId id) {
return false;
}
});
final ConnectionContext connectionContext = createConnectionContext();
for (Message message : toExpire) {
for (DurableTopicSubscription sub : durableSubscribers.values()) {
if (!sub.isActive()) {
message.setRegionDestination(this);
messageExpired(connectionContext, sub, message);
}
}
}
Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
if (msgs != null) {
for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
browseList.add(msgs[i]);
}
}
}
} catch (Throwable e) {
LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
}
}
@Override
public boolean iterate() {
synchronized (messagesWaitingForSpace) {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
if (!messagesWaitingForSpace.isEmpty()) {
registerCallbackForNotFullNotification();
}
}
return false;
}
private void registerCallbackForNotFullNotification() {
// If the usage manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
}
// Properties
// -------------------------------------------------------------------------
public DispatchPolicy getDispatchPolicy() {
return dispatchPolicy;
}
public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
this.dispatchPolicy = dispatchPolicy;
}
public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
return subscriptionRecoveryPolicy;
}
public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
// allow users to combine retained message policy with other ActiveMQ policies
RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
policy.setWrapped(recoveryPolicy);
} else {
this.subscriptionRecoveryPolicy = recoveryPolicy;
}
}
// Implementation methods
// -------------------------------------------------------------------------
@Override
public final void wakeup() {
}
protected void dispatch(final ConnectionContext context, Message message) throws Exception {
// AMQ-2586: Better to leave this stat at zero than to give the user
// misleading metrics.
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessageSize().addSize(message.getSize());
MessageEvaluationContext msgContext = null;
dispatchLock.readLock().lock();
try {
if (!subscriptionRecoveryPolicy.add(context, message)) {
return;
}
synchronized (consumers) {
if (consumers.isEmpty()) {
onMessageWithNoConsumers(context, message);
return;
}
}
// Clear memory before dispatch - need to clear here because the call to
//subscriptionRecoveryPolicy.add() will unmarshall the state
if (isReduceMemoryFootprint() && message.isMarshalled()) {
message.clearUnMarshalledState();
}
msgContext = context.getMessageEvaluationContext();
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
onMessageWithNoConsumers(context, message);
}
} finally {
dispatchLock.readLock().unlock();
if (msgContext != null) {
msgContext.clear();
}
}
}
private final Runnable expireMessagesTask = new Runnable() {
@Override
public void run() {
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, getMaxExpirePageSize());
}
};
@Override
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference, subs);
// AMQ-2586: Better to leave this stat at zero than to give the user
// misleading metrics.
// destinationStatistics.getMessages().decrement();
destinationStatistics.getExpired().increment();
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);
ack.setMessageID(reference.getMessageId());
try {
if (subs instanceof DurableTopicSubscription) {
((DurableTopicSubscription)subs).removePending(reference);
}
acknowledge(context, subs, ack, reference);
} catch (Exception e) {
LOG.error("Failed to remove expired Message from the store ", e);
}
}
@Override
protected Logger getLog() {
return LOG;
}
protected boolean isOptimizeStorage(){
boolean result = false;
if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
result = true;
for (DurableTopicSubscription s : durableSubscribers.values()) {
if (s.isActive()== false){
result = false;
break;
}
if (s.getPrefetchSize()==0){
result = false;
break;
}
if (s.isSlowConsumer()){
result = false;
break;
}
if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
result = false;
break;
}
}
}
return result;
}
/**
* force a reread of the store - after transaction recovery completion
*/
@Override
public void clearPendingMessages() {
dispatchLock.readLock().lock();
try {
for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
clearPendingAndDispatch(durableTopicSubscription);
}
} finally {
dispatchLock.readLock().unlock();
}
}
private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
synchronized (durableTopicSubscription.pendingLock) {
durableTopicSubscription.pending.clear();
try {
durableTopicSubscription.dispatchPending();
} catch (IOException exception) {
LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}, exception: {}",
durableTopicSubscription,
destination,
durableTopicSubscription.pending,
exception);
}
}
}
public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
return durableSubscribers;
}
}