/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.NoLocalSubscriptionAware;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;

import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;

/**
 * The Topic is a destination that sends a copy of a message to every active
 * Subscription registered.
 */
public class Topic extends BaseDestination implements Task {
    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
    private final TopicMessageStore topicStore;
    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
    private final TaskRunner taskRunner;
    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
        @Override
        public void run() {
            try {
                Topic.this.taskRunner.wakeup();
            } catch (InterruptedException e) {
            }
        }
    };

    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
        super(brokerService, store, destination, parentStats);
        this.topicStore = store;
        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
    }

    @Override
    public void initialize() throws Exception {
        super.initialize();
        // set non default subscription recovery policy (override policyEntries)
        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
            setAlwaysRetroactive(true);
        }
        if (store != null) {
            // AMQ-2586: Better to leave this stat at zero than to give the user
            // misleading metrics.
            // int messageCount = store.getMessageCount();
            // destinationStatistics.getMessages().setCount(messageCount);
            store.start();
        }
    }

    @Override
    public List<Subscription> getConsumers() {
        synchronized (consumers) {
            return new ArrayList<Subscription>(consumers);
        }
    }

    public boolean lock(MessageReference node, LockOwner sub) {
        return true;
    }

    @Override
    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
        if (!sub.getConsumerInfo().isDurable()) {

            // Do a retroactive recovery if needed.
            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {

                // synchronize with dispatch method so that no new messages are sent
                // while we are recovering a subscription to avoid out of order messages.
                dispatchLock.writeLock().lock();
                try {
                    boolean applyRecovery = false;
                    synchronized (consumers) {
                        if (!consumers.contains(sub)){
                            sub.add(context, this);
                            consumers.add(sub);
                            applyRecovery=true;
                            super.addSubscription(context, sub);
                        }
                    }
                    if (applyRecovery){
                        subscriptionRecoveryPolicy.recover(context, this, sub);
                    }
                } finally {
                    dispatchLock.writeLock().unlock();
                }

            } else {
                synchronized (consumers) {
                    if (!consumers.contains(sub)){
                        sub.add(context, this);
                        consumers.add(sub);
                        super.addSubscription(context, sub);
                    }
                }
            }
        } else {
            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
            super.addSubscription(context, sub);
            sub.add(context, this);
            if(dsub.isActive()) {
                synchronized (consumers) {
                    boolean hasSubscription = false;

                    if (consumers.size() == 0) {
                        hasSubscription = false;
                    } else {
                        for (Subscription currentSub : consumers) {
                            if (currentSub.getConsumerInfo().isDurable()) {
                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
                                    hasSubscription = true;
                                    break;
                                }
                            }
                        }
                    }

                    if (!hasSubscription) {
                        consumers.add(sub);
                    }
                }
            }
            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
        }
    }

    @Override
    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
        if (!sub.getConsumerInfo().isDurable()) {
            boolean removed = false;
            synchronized (consumers) {
                removed = consumers.remove(sub);
            }
            if (removed) {
                super.removeSubscription(context, sub, lastDeliveredSequenceId);
            }
        }
        sub.remove(context, this);
    }

    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
        if (topicStore != null) {
            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
            DurableTopicSubscription removed = durableSubscribers.remove(key);
            if (removed != null) {
                destinationStatistics.getConsumers().decrement();
                // deactivate and remove
                removed.deactivate(false, 0l);
                consumers.remove(removed);
            }
        }
    }

    private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
        if (hasSelectorChanged(info1, info2)) {
            return true;
        }

        return hasNoLocalChanged(info1, info2);
    }

    private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
        //Not all persistence adapters store the noLocal value for a subscription
        PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter();
        if (adapter instanceof NoLocalSubscriptionAware) {
            if (info1.isNoLocal() ^ info2.isNoLocal()) {
                return true;
            }
        }

        return false;
    }

    private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) {
        if (info1.getSelector() != null ^ info2.getSelector() != null) {
            return true;
        }

        if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
            return true;
        }

        return false;
    }

    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
        // synchronize with dispatch method so that no new messages are sent
        // while we are recovering a subscription to avoid out of order messages.
        dispatchLock.writeLock().lock();
        try {

            if (topicStore == null) {
                return;
            }

            // Recover the durable subscription.
            String clientId = subscription.getSubscriptionKey().getClientId();
            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
            if (info != null) {
                // Check to see if selector changed.
                if (hasDurableSubChanged(info, subscription.getConsumerInfo())) {
                    // Need to delete the subscription
                    topicStore.deleteSubscription(clientId, subscriptionName);
                    info = null;
                    // Force a rebuild of the selector chain for the subscription otherwise
                    // the stored subscription is updated but the selector expression is not
                    // and the subscription will not behave according to the new configuration.
                    subscription.setSelector(subscription.getConsumerInfo().getSelector());
                    synchronized (consumers) {
                        consumers.remove(subscription);
                    }
                } else {
                    synchronized (consumers) {
                        if (!consumers.contains(subscription)) {
                            consumers.add(subscription);
                        }
                    }
                }
            }

            // Do we need to create the subscription?
            if (info == null) {
                info = new SubscriptionInfo();
                info.setClientId(clientId);
                info.setSelector(subscription.getConsumerInfo().getSelector());
                info.setSubscriptionName(subscriptionName);
                info.setDestination(getActiveMQDestination());
                info.setNoLocal(subscription.getConsumerInfo().isNoLocal());
                // This destination is an actual destination id.
                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
                // This destination might be a pattern
                synchronized (consumers) {
                    consumers.add(subscription);
                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
                }
            }

            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
            msgContext.setDestination(destination);
            if (subscription.isRecoveryRequired()) {
                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
                    @Override
                    public boolean recoverMessage(Message message) throws Exception {
                        message.setRegionDestination(Topic.this);
                        try {
                            msgContext.setMessageReference(message);
                            if (subscription.matches(message, msgContext)) {
                                subscription.add(message);
                            }
                        } catch (IOException e) {
                            LOG.error("Failed to recover this message {}", message, e);
                        }
                        return true;
                    }

                    @Override
                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
                        throw new RuntimeException("Should not be called.");
                    }

                    @Override
                    public boolean hasSpace() {
                        return true;
                    }

                    @Override
                    public boolean isDuplicate(MessageId id) {
                        return false;
                    }
                });
            }
        } finally {
            dispatchLock.writeLock().unlock();
        }
    }

    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
        synchronized (consumers) {
            consumers.remove(sub);
        }
        sub.remove(context, this, dispatched);
    }

    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
        if (subscription.getConsumerInfo().isRetroactive()) {
            subscriptionRecoveryPolicy.recover(context, this, subscription);
        }
    }

    @Override
    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
        final ConnectionContext context = producerExchange.getConnectionContext();

        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
        producerExchange.incrementSend();
        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
                && !context.isInRecoveryMode();

        message.setRegionDestination(this);

        // There is delay between the client sending it and it arriving at the
        // destination.. it may have expired.
        if (message.isExpired()) {
            broker.messageExpired(context, message, null);
            getDestinationStatistics().getExpired().increment();
            if (sendProducerAck) {
                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                context.getConnection().dispatchAsync(ack);
            }
            return;
        }

        if (memoryUsage.isFull()) {
            isFull(context, memoryUsage);
            fastProducer(context, producerInfo);

            if (isProducerFlowControl() && context.isProducerFlowControl()) {

                if (isFlowControlLogRequired()) {
                    LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
                }

                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
                }

                // We can avoid blocking due to low usage if the producer is sending a sync message or
                // if it is using a producer window
                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
                    synchronized (messagesWaitingForSpace) {
                        messagesWaitingForSpace.add(new Runnable() {
                            @Override
                            public void run() {
                                try {

                                    // While waiting for space to free up...
                                    // the transaction may be done
                                    if (message.isInTransaction()) {
                                        if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
                                            throw new JMSException("Send transaction completed while waiting for space");
                                        }
                                    }

                                    // the message may have expired.
                                    if (message.isExpired()) {
                                        broker.messageExpired(context, message, null);
                                        getDestinationStatistics().getExpired().increment();
                                    } else {
                                        doMessageSend(producerExchange, message);
                                    }

                                    if (sendProducerAck) {
                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
                                                .getSize());
                                        context.getConnection().dispatchAsync(ack);
                                    } else {
                                        Response response = new Response();
                                        response.setCorrelationId(message.getCommandId());
                                        context.getConnection().dispatchAsync(response);
                                    }

                                } catch (Exception e) {
                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
                                        ExceptionResponse response = new ExceptionResponse(e);
                                        response.setCorrelationId(message.getCommandId());
                                        context.getConnection().dispatchAsync(response);
                                    }
                                }
                            }
                        });

                        registerCallbackForNotFullNotification();
                        context.setDontSendReponse(true);
                        return;
                    }

                } else {
                    // Producer flow control cannot be used, so we have do the flow control
                    // at the broker by blocking this thread until there is space available.

                    if (memoryUsage.isFull()) {
                        if (context.isInTransaction()) {

                            int count = 0;
                            while (!memoryUsage.waitForSpace(1000)) {
                                if (context.getStopping().get()) {
                                    throw new IOException("Connection closed, send aborted.");
                                }
                                if (count > 2 && context.isInTransaction()) {
                                    count = 0;
                                    int size = context.getTransaction().size();
                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
                                }
                                count++;
                            }
                        } else {
                            waitForSpace(
                                    context,
                                    producerExchange,
                                    memoryUsage,
                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
                                            + message.getProducerId()
                                            + ") to prevent flooding "
                                            + getActiveMQDestination().getQualifiedName()
                                            + "."
                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
                        }
                    }

                    // The usage manager could have delayed us by the time
                    // we unblock the message could have expired..
                    if (message.isExpired()) {
                        getDestinationStatistics().getExpired().increment();
                        LOG.debug("Expired message: {}", message);
                        return;
                    }
                }
            }
        }

        doMessageSend(producerExchange, message);
        messageDelivered(context, message);
        if (sendProducerAck) {
            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
            context.getConnection().dispatchAsync(ack);
        }
    }

    /**
     * do send the message - this needs to be synchronized to ensure messages
     * are stored AND dispatched in the right order
     *
     * @param producerExchange
     * @param message
     * @throws IOException
     * @throws Exception
     */
    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
            throws IOException, Exception {
        final ConnectionContext context = producerExchange.getConnectionContext();
        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
        Future<Object> result = null;

        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
                    throw new javax.jms.ResourceAllocationException(logMessage);
                }

                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
            }
            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());

            //Moved the reduceMemoryfootprint clearing to the dispatch method
        }

        message.incrementReferenceCount();

        if (context.isInTransaction() && (context.getTransaction() != null)) {
            context.getTransaction().addSynchronization(new Synchronization() {
                @Override
                public void afterCommit() throws Exception {
                    // It could take while before we receive the commit
                    // operation.. by that time the message could have
                    // expired..
                    if (message.isExpired()) {
                        if (broker.isExpired(message)) {
                            getDestinationStatistics().getExpired().increment();
                            broker.messageExpired(context, message, null);
                        }
                        message.decrementReferenceCount();
                        return;
                    }
                    try {
                        dispatch(context, message);
                    } finally {
                        message.decrementReferenceCount();
                    }
                }

                @Override
                public void afterRollback() throws Exception {
                    message.decrementReferenceCount();
                }
            });

        } else {
            try {
                dispatch(context, message);
            } finally {
                message.decrementReferenceCount();
            }
        }

        if (result != null && !result.isCancelled()) {
            try {
                result.get();
            } catch (CancellationException e) {
                // ignore - the task has been cancelled if the message
                // has already been deleted
            }
        }
    }

    private boolean canOptimizeOutPersistence() {
        return durableSubscribers.size() == 0;
    }

    @Override
    public String toString() {
        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
    }

    @Override
    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
            final MessageReference node) throws IOException {
        if (topicStore != null && node.isPersistent()) {
            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
            SubscriptionKey key = dsub.getSubscriptionKey();
            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
                    convertToNonRangedAck(ack, node));
        }
        messageConsumed(context, node);
    }

    @Override
    public void gc() {
    }

    public Message loadMessage(MessageId messageId) throws IOException {
        return topicStore != null ? topicStore.getMessage(messageId) : null;
    }

    @Override
    public void start() throws Exception {
        if (started.compareAndSet(false, true)) {
            this.subscriptionRecoveryPolicy.start();
            if (memoryUsage != null) {
                memoryUsage.start();
            }

            if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
                scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
            }
        }
    }

    @Override
    public void stop() throws Exception {
        if (started.compareAndSet(true, false)) {
            if (taskRunner != null) {
                taskRunner.shutdown();
            }
            this.subscriptionRecoveryPolicy.stop();
            if (memoryUsage != null) {
                memoryUsage.stop();
            }
            if (this.topicStore != null) {
                this.topicStore.stop();
            }

            scheduler.cancel(expireMessagesTask);
        }
    }

    @Override
    public Message[] browse() {
        final List<Message> result = new ArrayList<Message>();
        doBrowse(result, getMaxBrowsePageSize());
        return result.toArray(new Message[result.size()]);
    }

    private void doBrowse(final List<Message> browseList, final int max) {
        try {
            if (topicStore != null) {
                final List<Message> toExpire = new ArrayList<Message>();
                topicStore.recover(new MessageRecoveryListener() {
                    @Override
                    public boolean recoverMessage(Message message) throws Exception {
                        if (message.isExpired()) {
                            toExpire.add(message);
                        }
                        browseList.add(message);
                        return true;
                    }

                    @Override
                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
                        return true;
                    }

                    @Override
                    public boolean hasSpace() {
                        return browseList.size() < max;
                    }

                    @Override
                    public boolean isDuplicate(MessageId id) {
                        return false;
                    }
                });
                final ConnectionContext connectionContext = createConnectionContext();
                for (Message message : toExpire) {
                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
                        if (!sub.isActive()) {
                            message.setRegionDestination(this);
                            messageExpired(connectionContext, sub, message);
                        }
                    }
                }
                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
                if (msgs != null) {
                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
                        browseList.add(msgs[i]);
                    }
                }
            }
        } catch (Throwable e) {
            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
        }
    }

    @Override
    public boolean iterate() {
        synchronized (messagesWaitingForSpace) {
            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
                Runnable op = messagesWaitingForSpace.removeFirst();
                op.run();
            }

            if (!messagesWaitingForSpace.isEmpty()) {
                registerCallbackForNotFullNotification();
            }
        }
        return false;
    }

    private void registerCallbackForNotFullNotification() {
        // If the usage manager is not full, then the task will not
        // get called..
        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
            // so call it directly here.
            sendMessagesWaitingForSpaceTask.run();
        }
    }

    // Properties
    // -------------------------------------------------------------------------

    public DispatchPolicy getDispatchPolicy() {
        return dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
        return subscriptionRecoveryPolicy;
    }

    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
            // allow users to combine retained message policy with other ActiveMQ policies
            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
            policy.setWrapped(recoveryPolicy);
        } else {
            this.subscriptionRecoveryPolicy = recoveryPolicy;
        }
    }

    // Implementation methods
    // -------------------------------------------------------------------------

    @Override
    public final void wakeup() {
    }

    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
        // AMQ-2586: Better to leave this stat at zero than to give the user
        // misleading metrics.
        // destinationStatistics.getMessages().increment();
        destinationStatistics.getEnqueues().increment();
        destinationStatistics.getMessageSize().addSize(message.getSize());
        MessageEvaluationContext msgContext = null;

        dispatchLock.readLock().lock();
        try {
            if (!subscriptionRecoveryPolicy.add(context, message)) {
                return;
            }
            synchronized (consumers) {
                if (consumers.isEmpty()) {
                    onMessageWithNoConsumers(context, message);
                    return;
                }
            }

            // Clear memory before dispatch - need to clear here because the call to
            //subscriptionRecoveryPolicy.add() will unmarshall the state
            if (isReduceMemoryFootprint() && message.isMarshalled()) {
                message.clearUnMarshalledState();
            }

            msgContext = context.getMessageEvaluationContext();
            msgContext.setDestination(destination);
            msgContext.setMessageReference(message);
            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
                onMessageWithNoConsumers(context, message);
            }

        } finally {
            dispatchLock.readLock().unlock();
            if (msgContext != null) {
                msgContext.clear();
            }
        }
    }

    private final Runnable expireMessagesTask = new Runnable() {
        @Override
        public void run() {
            List<Message> browsedMessages = new InsertionCountList<Message>();
            doBrowse(browsedMessages, getMaxExpirePageSize());
        }
    };

    @Override
    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
        broker.messageExpired(context, reference, subs);
        // AMQ-2586: Better to leave this stat at zero than to give the user
        // misleading metrics.
        // destinationStatistics.getMessages().decrement();
        destinationStatistics.getExpired().increment();
        MessageAck ack = new MessageAck();
        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
        ack.setDestination(destination);
        ack.setMessageID(reference.getMessageId());
        try {
            if (subs instanceof DurableTopicSubscription) {
                ((DurableTopicSubscription)subs).removePending(reference);
            }
            acknowledge(context, subs, ack, reference);
        } catch (Exception e) {
            LOG.error("Failed to remove expired Message from the store ", e);
        }
    }

    @Override
    protected Logger getLog() {
        return LOG;
    }

    protected boolean isOptimizeStorage(){
        boolean result = false;

        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
                result = true;
                for (DurableTopicSubscription s : durableSubscribers.values()) {
                    if (s.isActive()== false){
                        result = false;
                        break;
                    }
                    if (s.getPrefetchSize()==0){
                        result = false;
                        break;
                    }
                    if (s.isSlowConsumer()){
                        result = false;
                        break;
                    }
                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
                        result = false;
                        break;
                    }
                }
        }
        return result;
    }

    /**
     * force a reread of the store - after transaction recovery completion
     */
    @Override
    public void clearPendingMessages() {
        dispatchLock.readLock().lock();
        try {
            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
                clearPendingAndDispatch(durableTopicSubscription);
            }
        } finally {
            dispatchLock.readLock().unlock();
        }
    }

    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
        synchronized (durableTopicSubscription.pendingLock) {
            durableTopicSubscription.pending.clear();
            try {
                durableTopicSubscription.dispatchPending();
            } catch (IOException exception) {
                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}, exception: {}",
                        durableTopicSubscription,
                        destination,
                        durableTopicSubscription.pending,
                        exception);
            }
        }
    }

    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
        return durableSubscribers;
    }
}
