/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq;

import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;

import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <P>
 * A <CODE>Session</CODE> object is a single-threaded context for producing
 * and consuming messages. Although it may allocate provider resources outside
 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
 * <P>
 * A session serves several purposes:
 * <UL>
 * <LI>It is a factory for its message producers and consumers.
 * <LI>It supplies provider-optimized message factories.
 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
 * <CODE>TemporaryQueues</CODE>.
 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
 * objects for those clients that need to dynamically manipulate
 * provider-specific destination names.
 * <LI>It supports a single series of transactions that combine work spanning
 * its producers and consumers into atomic units.
 * <LI>It defines a serial order for the messages it consumes and the messages
 * it produces.
 * <LI>It retains messages it consumes until they have been acknowledged.
 * <LI>It serializes execution of message listeners registered with its message
 * consumers.
 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
 * </UL>
 * <P>
 * A session can create and service multiple message producers and consumers.
 * <P>
 * One typical use is to have a thread block on a synchronous
 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
 * <P>
 * If a client desires to have one thread produce messages while others consume
 * them, the client should use a separate session for its producing thread.
 * <P>
 * Once a connection has been started, any session with one or more registered
 * message listeners is dedicated to the thread of control that delivers
 * messages to it. It is erroneous for client code to use this session or any of
 * its constituent objects from another thread of control. The only exception to
 * this rule is the use of the session or connection <CODE>close</CODE>
 * method.
 * <P>
 * It should be easy for most clients to partition their work naturally into
 * sessions. This model allows clients to start simply and incrementally add
 * message processing complexity as their need for concurrency grows.
 * <P>
 * The <CODE>close</CODE> method is the only session method that can be called
 * while some other session method is being executed in another thread.
 * <P>
 * A session may be specified as transacted. Each transacted session supports a
 * single series of transactions. Each transaction groups a set of message sends
 * and a set of message receives into an atomic unit of work. In effect,
 * transactions organize a session's input message stream and output message
 * stream into series of atomic units. When a transaction commits, its atomic
 * unit of input is acknowledged and its associated atomic unit of output is
 * sent. If a transaction rollback is done, the transaction's sent messages are
 * destroyed and the session's input is automatically recovered.
 * <P>
 * The content of a transaction's input and output units is simply those
 * messages that have been produced and consumed within the session's current
 * transaction.
 * <P>
 * A transaction is completed using either its session's <CODE>commit</CODE>
 * method or its session's <CODE>rollback </CODE> method. The completion of a
 * session's current transaction automatically begins the next. The result is
 * that a transacted session always has a current transaction within which its
 * work is done.
 * <P>
 * The Java Transaction Service (JTS) or some other transaction monitor may be
 * used to combine a session's transaction with transactions on other resources
 * (databases, other JMS sessions, etc.). Since Java distributed transactions
 * are controlled via the Java Transaction API (JTA), use of the session's
 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
 * prohibited.
 * <P>
 * The JMS API does not require support for JTA; however, it does define how a
 * provider supplies this support.
 * <P>
 * Although it is also possible for a JMS client to handle distributed
 * transactions directly, it is unlikely that many JMS clients will do this.
 * Support for JTA in the JMS API is targeted at systems vendors who will be
 * integrating the JMS API into their application server products.
 *
 *
 * @see javax.jms.Session
 * @see javax.jms.QueueSession
 * @see javax.jms.TopicSession
 * @see javax.jms.XASession
 */
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {

    /**
     * Only acknowledge an individual message - using message.acknowledge()
     * as opposed to CLIENT_ACKNOWLEDGE which
     * acknowledges all messages consumed by a session at when acknowledge()
     * is called
     */
    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
    public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;

    public static interface DeliveryListener {
        void beforeDelivery(ActiveMQSession session, Message msg);

        void afterDelivery(ActiveMQSession session, Message msg);
    }

    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
    private final ThreadPoolExecutor connectionExecutor;

    protected int acknowledgementMode;
    protected final ActiveMQConnection connection;
    protected final SessionInfo info;
    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
    protected final ActiveMQSessionExecutor executor;
    protected final AtomicBoolean started = new AtomicBoolean(false);

    protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
    protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();

    protected boolean closed;
    private volatile boolean synchronizationRegistered;
    protected boolean asyncDispatch;
    protected boolean sessionAsyncDispatch;
    protected final boolean debug;
    protected final Object sendMutex = new Object();
    protected final Object redeliveryGuard = new Object();

    private final AtomicBoolean clearInProgress = new AtomicBoolean();

    private MessageListener messageListener;
    private final JMSSessionStatsImpl stats;
    private TransactionContext transactionContext;
    private DeliveryListener deliveryListener;
    private MessageTransformer transformer;
    private BlobTransferPolicy blobTransferPolicy;
    private long lastDeliveredSequenceId = -2;

    /**
     * Construct the Session
     *
     * @param connection
     * @param sessionId
     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
     *                Session.SESSION_TRANSACTED
     * @param asyncDispatch
     * @param sessionAsyncDispatch
     * @throws JMSException on internal error
     */
    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
        this.debug = LOG.isDebugEnabled();
        this.connection = connection;
        this.acknowledgementMode = acknowledgeMode;
        this.asyncDispatch = asyncDispatch;
        this.sessionAsyncDispatch = sessionAsyncDispatch;
        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
        setTransactionContext(new TransactionContext(connection));
        stats = new JMSSessionStatsImpl(producers, consumers);
        this.connection.asyncSendPacket(info);
        setTransformer(connection.getTransformer());
        setBlobTransferPolicy(connection.getBlobTransferPolicy());
        this.connectionExecutor=connection.getExecutor();
        this.executor = new ActiveMQSessionExecutor(this);
        connection.addSession(this);
        if (connection.isStarted()) {
            start();
        }

    }

    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
        this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
    }

    /**
     * Sets the transaction context of the session.
     *
     * @param transactionContext - provides the means to control a JMS
     *                transaction.
     */
    public void setTransactionContext(TransactionContext transactionContext) {
        this.transactionContext = transactionContext;
    }

    /**
     * Returns the transaction context of the session.
     *
     * @return transactionContext - session's transaction context.
     */
    public TransactionContext getTransactionContext() {
        return transactionContext;
    }

    /*
     * (non-Javadoc)
     *
     * @see org.apache.activemq.management.StatsCapable#getStats()
     */
    @Override
    public StatsImpl getStats() {
        return stats;
    }

    /**
     * Returns the session's statistics.
     *
     * @return stats - session's statistics.
     */
    public JMSSessionStatsImpl getSessionStats() {
        return stats;
    }

    /**
     * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
     * object is used to send a message containing a stream of uninterpreted
     * bytes.
     *
     * @return the an ActiveMQBytesMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    @Override
    public BytesMessage createBytesMessage() throws JMSException {
        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
        configureMessage(message);
        return message;
    }

    /**
     * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
     * object is used to send a self-defining set of name-value pairs, where
     * names are <CODE>String</CODE> objects and values are primitive values
     * in the Java programming language.
     *
     * @return an ActiveMQMapMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    @Override
    public MapMessage createMapMessage() throws JMSException {
        ActiveMQMapMessage message = new ActiveMQMapMessage();
        configureMessage(message);
        return message;
    }

    /**
     * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
     * interface is the root interface of all JMS messages. A
     * <CODE>Message</CODE> object holds all the standard message header
     * information. It can be sent when a message containing only header
     * information is sufficient.
     *
     * @return an ActiveMQMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    @Override
    public Message createMessage() throws JMSException {
        ActiveMQMessage message = new ActiveMQMessage();
        configureMessage(message);
        return message;
    }

    /**
     * Creates an <CODE>ObjectMessage</CODE> object. An
     * <CODE>ObjectMessage</CODE> object is used to send a message that
     * contains a serializable Java object.
     *
     * @return an ActiveMQObjectMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    @Override
    public ObjectMessage createObjectMessage() throws JMSException {
        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        configureMessage(message);
        return message;
    }

    /**
     * Creates an initialized <CODE>ObjectMessage</CODE> object. An
     * <CODE>ObjectMessage</CODE> object is used to send a message that
     * contains a serializable Java object.
     *
     * @param object the object to use to initialize this message
     * @return an ActiveMQObjectMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    @Override
    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        configureMessage(message);
        message.setObject(object);
        return message;
    }

    /**
     * Creates a <CODE>StreamMessage</CODE> object. A
     * <CODE>StreamMessage</CODE> object is used to send a self-defining
     * stream of primitive values in the Java programming language.
     *
     * @return an ActiveMQStreamMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    @Override
    public StreamMessage createStreamMessage() throws JMSException {
        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
        configureMessage(message);
        return message;
    }

    /**
     * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
     * object is used to send a message containing a <CODE>String</CODE>
     * object.
     *
     * @return an ActiveMQTextMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    @Override
    public TextMessage createTextMessage() throws JMSException {
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        configureMessage(message);
        return message;
    }

    /**
     * Creates an initialized <CODE>TextMessage</CODE> object. A
     * <CODE>TextMessage</CODE> object is used to send a message containing a
     * <CODE>String</CODE>.
     *
     * @param text the string used to initialize this message
     * @return an ActiveMQTextMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    @Override
    public TextMessage createTextMessage(String text) throws JMSException {
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText(text);
        configureMessage(message);
        return message;
    }

    /**
     * Creates an initialized <CODE>BlobMessage</CODE> object. A
     * <CODE>BlobMessage</CODE> object is used to send a message containing a
     * <CODE>URL</CODE> which points to some network addressible BLOB.
     *
     * @param url the network addressable URL used to pass directly to the
     *                consumer
     * @return a BlobMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    public BlobMessage createBlobMessage(URL url) throws JMSException {
        return createBlobMessage(url, false);
    }

    /**
     * Creates an initialized <CODE>BlobMessage</CODE> object. A
     * <CODE>BlobMessage</CODE> object is used to send a message containing a
     * <CODE>URL</CODE> which points to some network addressible BLOB.
     *
     * @param url the network addressable URL used to pass directly to the
     *                consumer
     * @param deletedByBroker indicates whether or not the resource is deleted
     *                by the broker when the message is acknowledged
     * @return a BlobMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
        configureMessage(message);
        message.setURL(url);
        message.setDeletedByBroker(deletedByBroker);
        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
        return message;
    }

    /**
     * Creates an initialized <CODE>BlobMessage</CODE> object. A
     * <CODE>BlobMessage</CODE> object is used to send a message containing
     * the <CODE>File</CODE> content. Before the message is sent the file
     * conent will be uploaded to the broker or some other remote repository
     * depending on the {@link #getBlobTransferPolicy()}.
     *
     * @param file the file to be uploaded to some remote repo (or the broker)
     *                depending on the strategy
     * @return a BlobMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    public BlobMessage createBlobMessage(File file) throws JMSException {
        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
        configureMessage(message);
        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
        message.setDeletedByBroker(true);
        message.setName(file.getName());
        return message;
    }

    /**
     * Creates an initialized <CODE>BlobMessage</CODE> object. A
     * <CODE>BlobMessage</CODE> object is used to send a message containing
     * the <CODE>File</CODE> content. Before the message is sent the file
     * conent will be uploaded to the broker or some other remote repository
     * depending on the {@link #getBlobTransferPolicy()}. <br/>
     * <p>
     * The caller of this method is responsible for closing the
     * input stream that is used, however the stream can not be closed
     * until <b>after</b> the message has been sent.  To have this class
     * manage the stream and close it automatically, use the method
     * {@link ActiveMQSession#createBlobMessage(File)}
     *
     * @param in the stream to be uploaded to some remote repo (or the broker)
     *                depending on the strategy
     * @return a BlobMessage
     * @throws JMSException if the JMS provider fails to create this message due
     *                 to some internal error.
     */
    public BlobMessage createBlobMessage(InputStream in) throws JMSException {
        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
        configureMessage(message);
        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
        message.setDeletedByBroker(true);
        return message;
    }

    /**
     * Indicates whether the session is in transacted mode.
     *
     * @return true if the session is in transacted mode
     * @throws JMSException if there is some internal error.
     */
    @Override
    public boolean getTransacted() throws JMSException {
        checkClosed();
        return isTransacted();
    }

    /**
     * Returns the acknowledgement mode of the session. The acknowledgement mode
     * is set at the time that the session is created. If the session is
     * transacted, the acknowledgement mode is ignored.
     *
     * @return If the session is not transacted, returns the current
     *         acknowledgement mode for the session. If the session is
     *         transacted, returns SESSION_TRANSACTED.
     * @throws JMSException
     * @see javax.jms.Connection#createSession(boolean,int)
     * @since 1.1 exception JMSException if there is some internal error.
     */
    @Override
    public int getAcknowledgeMode() throws JMSException {
        checkClosed();
        return this.acknowledgementMode;
    }

    /**
     * Commits all messages done in this transaction and releases any locks
     * currently held.
     *
     * @throws JMSException if the JMS provider fails to commit the transaction
     *                 due to some internal error.
     * @throws TransactionRolledBackException if the transaction is rolled back
     *                 due to some internal error during commit.
     * @throws javax.jms.IllegalStateException if the method is not called by a
     *                 transacted session.
     */
    @Override
    public void commit() throws JMSException {
        checkClosed();
        if (!getTransacted()) {
            throw new javax.jms.IllegalStateException("Not a transacted session");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
        }
        transactionContext.commit();
    }

    /**
     * Rolls back any messages done in this transaction and releases any locks
     * currently held.
     *
     * @throws JMSException if the JMS provider fails to roll back the
     *                 transaction due to some internal error.
     * @throws javax.jms.IllegalStateException if the method is not called by a
     *                 transacted session.
     */
    @Override
    public void rollback() throws JMSException {
        checkClosed();
        if (!getTransacted()) {
            throw new javax.jms.IllegalStateException("Not a transacted session");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
        }
        transactionContext.rollback();
    }

    /**
     * Closes the session.
     * <P>
     * Since a provider may allocate some resources on behalf of a session
     * outside the JVM, clients should close the resources when they are not
     * needed. Relying on garbage collection to eventually reclaim these
     * resources may not be timely enough.
     * <P>
     * There is no need to close the producers and consumers of a closed
     * session.
     * <P>
     * This call will block until a <CODE>receive</CODE> call or message
     * listener in progress has completed. A blocked message consumer
     * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
     * is closed.
     * <P>
     * Closing a transacted session must roll back the transaction in progress.
     * <P>
     * This method is the only <CODE>Session</CODE> method that can be called
     * concurrently.
     * <P>
     * Invoking any other <CODE>Session</CODE> method on a closed session must
     * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
     * closed session must <I>not </I> throw an exception.
     *
     * @throws JMSException if the JMS provider fails to close the session due
     *                 to some internal error.
     */
    @Override
    public void close() throws JMSException {
        if (!closed) {
            if (getTransactionContext().isInXATransaction()) {
                if (!synchronizationRegistered) {
                    synchronizationRegistered = true;
                    getTransactionContext().addSynchronization(new Synchronization() {

                                        @Override
                                        public void afterCommit() throws Exception {
                                            doClose();
                                            synchronizationRegistered = false;
                                        }

                                        @Override
                                        public void afterRollback() throws Exception {
                                            doClose();
                                            synchronizationRegistered = false;
                                        }
                                    });
                }

            } else {
                doClose();
            }
        }
    }

    private void doClose() throws JMSException {
        dispose();
        RemoveInfo removeCommand = info.createRemoveCommand();
        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
        connection.asyncSendPacket(removeCommand);
    }

    final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
    void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
        clearRequestsCounter.incrementAndGet();
        executor.clearMessagesInProgress();
        // we are called from inside the transport reconnection logic which involves us
        // clearing all the connections' consumers dispatch and delivered lists. So rather
        // than trying to grab a mutex (which could be already owned by the message listener
        // calling the send or an ack) we allow it to complete in a separate thread via the
        // scheduler and notify us via connection.transportInterruptionProcessingComplete()
        //
        // We must be careful though not to allow multiple calls to this method from a
        // connection that is having issue becoming fully established from causing a large
        // build up of scheduled tasks to clear the same consumers over and over.
        if (consumers.isEmpty()) {
            return;
        }

        if (clearInProgress.compareAndSet(false, true)) {
            for (final ActiveMQMessageConsumer consumer : consumers) {
                consumer.inProgressClearRequired();
                transportInterruptionProcessingComplete.incrementAndGet();
                try {
                    connection.getScheduler().executeAfterDelay(new Runnable() {
                        @Override
                        public void run() {
                            consumer.clearMessagesInProgress();
                        }}, 0l);
                } catch (JMSException e) {
                    connection.onClientInternalException(e);
                }
            }

            try {
                connection.getScheduler().executeAfterDelay(new Runnable() {
                    @Override
                    public void run() {
                        clearInProgress.set(false);
                    }}, 0l);
            } catch (JMSException e) {
                connection.onClientInternalException(e);
            }
        }
    }

    void deliverAcks() {
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer consumer = iter.next();
            consumer.deliverAcks();
        }
    }

    public synchronized void dispose() throws JMSException {
        if (!closed) {

            try {
                executor.close();

                for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
                    ActiveMQMessageConsumer consumer = iter.next();
                    consumer.setFailureError(connection.getFirstFailureError());
                    consumer.dispose();
                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
                }
                consumers.clear();

                for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
                    ActiveMQMessageProducer producer = iter.next();
                    producer.dispose();
                }
                producers.clear();

                try {
                    if (getTransactionContext().isInLocalTransaction()) {
                        rollback();
                    }
                } catch (JMSException e) {
                }

            } finally {
                connection.removeSession(this);
                closed = true;
                this.transactionContext = null;
            }
        }
    }

    /**
     * Checks that the session is not closed then configures the message
     */
    protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
        checkClosed();
        message.setConnection(connection);
    }

    /**
     * Check if the session is closed. It is used for ensuring that the session
     * is open before performing various operations.
     *
     * @throws IllegalStateException if the Session is closed
     */
    protected void checkClosed() throws IllegalStateException {
        if (closed) {
            throw new IllegalStateException("The Session is closed");
        }
    }

    /**
     * Checks if the session is closed.
     *
     * @return true if the session is closed, false otherwise.
     */
    public boolean isClosed() {
        return closed;
    }

    /**
     * Stops message delivery in this session, and restarts message delivery
     * with the oldest unacknowledged message.
     * <P>
     * All consumers deliver messages in a serial order. Acknowledging a
     * received message automatically acknowledges all messages that have been
     * delivered to the client.
     * <P>
     * Restarting a session causes it to take the following actions:
     * <UL>
     * <LI>Stop message delivery
     * <LI>Mark all messages that might have been delivered but not
     * acknowledged as "redelivered"
     * <LI>Restart the delivery sequence including all unacknowledged messages
     * that had been previously delivered. Redelivered messages do not have to
     * be delivered in exactly their original delivery order.
     * </UL>
     *
     * @throws JMSException if the JMS provider fails to stop and restart
     *                 message delivery due to some internal error.
     * @throws IllegalStateException if the method is called by a transacted
     *                 session.
     */
    @Override
    public void recover() throws JMSException {

        checkClosed();
        if (getTransacted()) {
            throw new IllegalStateException("This session is transacted");
        }

        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            c.rollback();
        }

    }

    /**
     * Returns the session's distinguished message listener (optional).
     *
     * @return the message listener associated with this session
     * @throws JMSException if the JMS provider fails to get the message
     *                 listener due to an internal error.
     * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
     * @see javax.jms.ServerSessionPool
     * @see javax.jms.ServerSession
     */
    @Override
    public MessageListener getMessageListener() throws JMSException {
        checkClosed();
        return this.messageListener;
    }

    /**
     * Sets the session's distinguished message listener (optional).
     * <P>
     * When the distinguished message listener is set, no other form of message
     * receipt in the session can be used; however, all forms of sending
     * messages are still supported.
     * <P>
     * If this session has been closed, then an {@link IllegalStateException} is
     * thrown, if trying to set a new listener. However setting the listener
     * to <tt>null</tt> is allowed, to clear the listener, even if this session
     * has been closed prior.
     * <P>
     * This is an expert facility not used by regular JMS clients.
     *
     * @param listener the message listener to associate with this session
     * @throws JMSException if the JMS provider fails to set the message
     *                 listener due to an internal error.
     * @see javax.jms.Session#getMessageListener()
     * @see javax.jms.ServerSessionPool
     * @see javax.jms.ServerSession
     */
    @Override
    public void setMessageListener(MessageListener listener) throws JMSException {
        // only check for closed if we set a new listener, as we allow to clear
        // the listener, such as when an application is shutting down, and is
        // no longer using a message listener on this session
        if (listener != null) {
            checkClosed();
        }
        this.messageListener = listener;

        if (listener != null) {
            executor.setDispatchedBySessionPool(true);
        }
    }

    /**
     * Optional operation, intended to be used only by Application Servers, not
     * by ordinary JMS clients.
     *
     * @see javax.jms.ServerSession
     */
    @Override
    public void run() {
        MessageDispatch messageDispatch;
        while ((messageDispatch = executor.dequeueNoWait()) != null) {
            final MessageDispatch md = messageDispatch;

            // subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage
            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy();
            if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
                ((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
            }
            if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
                ((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages());
                ((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages());
            }

            MessageAck earlyAck = null;
            if (message.isExpired()) {
                earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
                earlyAck.setFirstMessageId(message.getMessageId());
            } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
                LOG.debug("{} got duplicate: {}", this, message.getMessageId());
                earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
                earlyAck.setFirstMessageId(md.getMessage().getMessageId());
                earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
            }
            if (earlyAck != null) {
                try {
                    asyncSendPacket(earlyAck);
                } catch (Throwable t) {
                    LOG.error("error dispatching ack: {} ", earlyAck, t);
                    connection.onClientInternalException(t);
                } finally {
                    continue;
                }
            }

            if (isClientAcknowledge()||isIndividualAcknowledge()) {
                message.setAcknowledgeCallback(new Callback() {
                    @Override
                    public void execute() throws Exception {
                    }
                });
            }

            if (deliveryListener != null) {
                deliveryListener.beforeDelivery(this, message);
            }

            md.setDeliverySequenceId(getNextDeliveryId());
            lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();

            final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);

            final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
            /*
            * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
            * We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
            * */
            synchronized (redeliveryGuard) {
                try {
                    ack.setFirstMessageId(md.getMessage().getMessageId());
                    doStartTransaction();
                    ack.setTransactionId(getTransactionContext().getTransactionId());
                    if (ack.getTransactionId() != null) {
                        getTransactionContext().addSynchronization(new Synchronization() {

                            final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());

                            @Override
                            public void beforeEnd() throws Exception {
                                // validate our consumer so we don't push stale acks that get ignored
                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
                                    LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
                                    throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
                                }
                                LOG.trace("beforeEnd ack {}", ack);
                                sendAck(ack);
                            }

                            @Override
                            public void afterRollback() throws Exception {
                                if (LOG.isTraceEnabled()) {
                                    LOG.trace("afterRollback {}", ack, new Throwable("here"));
                                }
                                // ensure we don't filter this as a duplicate
                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());

                                // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
                                if (clearRequestsCounter.get() > clearRequestCount) {
                                    LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
                                    return;
                                }

                                // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
                                    LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
                                    return;
                                }

                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
                                        && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
                                    // We need to NACK the messages so that they get
                                    // sent to the
                                    // DLQ.
                                    // Acknowledge the last message.
                                    MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
                                    ack.setFirstMessageId(md.getMessage().getMessageId());
                                    ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
                                    LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
                                    asyncSendPacket(ack);

                                } else {

                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
                                    ack.setFirstMessageId(md.getMessage().getMessageId());
                                    asyncSendPacket(ack);

                                    // Figure out how long we should wait to resend
                                    // this message.
                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
                                    for (int i = 0; i < redeliveryCounter; i++) {
                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
                                    }

                                    /*
                                    * If we are a non blocking delivery then we need to stop the executor to avoid more
                                    * messages being delivered, once the message is redelivered we can restart it.
                                    * */
                                    if (!connection.isNonBlockingRedelivery()) {
                                        LOG.debug("Blocking session until re-delivery...");
                                        executor.stop();
                                    }

                                    connection.getScheduler().executeAfterDelay(new Runnable() {

                                        @Override
                                        public void run() {
                                            /*
                                            * wait for the first delivery to be complete, i.e. after delivery has been called.
                                            * */
                                            synchronized (redeliveryGuard) {
                                                /*
                                                * If its non blocking then we can just dispatch in a new session.
                                                * */
                                                if (connection.isNonBlockingRedelivery()) {
                                                    ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
                                                } else {
                                                    /*
                                                    * If there has been an error thrown during afterDelivery then the
                                                    * endpoint will be marked as dead so redelivery will fail (and eventually
                                                    * the session marked as stale), in this case we can only call dispatch
                                                    * which will create a new session with a new endpoint.
                                                    * */
                                                    if (afterDeliveryError.get()) {
                                                        ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
                                                    } else {
                                                        executor.executeFirst(md);
                                                        executor.start();
                                                    }
                                                }
                                            }
                                        }
                                    }, redeliveryDelay);
                                }
                                md.getMessage().onMessageRolledBack();
                            }
                        });
                    }

                    LOG.trace("{} onMessage({})", this, message.getMessageId());
                    messageListener.onMessage(message);

                } catch (Throwable e) {
                    if (!isClosed()) {
                        LOG.error("{} error dispatching message: {} ", this, message.getMessageId(), e);
                    }

                    if (getTransactionContext() != null && getTransactionContext().isInXATransaction()) {
                        LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
                        getTransactionContext().setRollbackOnly(true);
                    }

                    // A problem while invoking the MessageListener does not
                    // in general indicate a problem with the connection to the broker, i.e.
                    // it will usually be sufficient to let the afterDelivery() method either
                    // commit or roll back in order to deal with the exception.
                    // However, we notify any registered client internal exception listener
                    // of the problem.
                    connection.onClientInternalException(e);
                } finally {
                    if (ack.getTransactionId() == null) {
                        try {
                            asyncSendPacket(ack);
                        } catch (Throwable e) {
                            connection.onClientInternalException(e);
                        }
                    }
                }

                if (deliveryListener != null) {
                    try {
                        deliveryListener.afterDelivery(this, message);
                    } catch (Throwable t) {
                        LOG.debug("Unable to call after delivery", t);
                        afterDeliveryError.set(true);
                        throw new RuntimeException(t);
                    }
                }
            }
            /*
            * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
            * It also needs to be outside the redelivery guard.
            * */
            try {
                executor.waitForQueueRestart();
            } catch (InterruptedException ex) {
                connection.onClientInternalException(ex);
            }
        }
    }

    /**
     * Creates a <CODE>MessageProducer</CODE> to send messages to the
     * specified destination.
     * <P>
     * A client uses a <CODE>MessageProducer</CODE> object to send messages to
     * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
     * inherit from <CODE>Destination</CODE>, they can be used in the
     * destination parameter to create a <CODE>MessageProducer</CODE> object.
     *
     * @param destination the <CODE>Destination</CODE> to send to, or null if
     *                this is a producer which does not have a specified
     *                destination.
     * @return the MessageProducer
     * @throws JMSException if the session fails to create a MessageProducer due
     *                 to some internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified.
     * @since 1.1
     */
    @Override
    public MessageProducer createProducer(Destination destination) throws JMSException {
        checkClosed();
        if (destination instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createProducer(this);
        }
        int timeSendOut = connection.getSendTimeout();
        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
    }

    /**
     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
     * <CODE>Destination</CODE>, they can be used in the destination
     * parameter to create a <CODE>MessageConsumer</CODE>.
     *
     * @param destination the <CODE>Destination</CODE> to access.
     * @return the MessageConsumer
     * @throws JMSException if the session fails to create a consumer due to
     *                 some internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified.
     * @since 1.1
     */
    @Override
    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        return createConsumer(destination, (String) null);
    }

    /**
     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
     * using a message selector. Since <CODE> Queue</CODE> and
     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
     * can be used in the destination parameter to create a
     * <CODE>MessageConsumer</CODE>.
     * <P>
     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
     * that have been sent to a destination.
     *
     * @param destination the <CODE>Destination</CODE> to access
     * @param messageSelector only messages with properties matching the message
     *                selector expression are delivered. A value of null or an
     *                empty string indicates that there is no message selector
     *                for the message consumer.
     * @return the MessageConsumer
     * @throws JMSException if the session fails to create a MessageConsumer due
     *                 to some internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified.
     * @throws InvalidSelectorException if the message selector is invalid.
     * @since 1.1
     */
    @Override
    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        return createConsumer(destination, messageSelector, false);
    }

    /**
     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
     * <CODE>Destination</CODE>, they can be used in the destination
     * parameter to create a <CODE>MessageConsumer</CODE>.
     *
     * @param destination the <CODE>Destination</CODE> to access.
     * @param messageListener the listener to use for async consumption of messages
     * @return the MessageConsumer
     * @throws JMSException if the session fails to create a consumer due to
     *                 some internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified.
     * @since 1.1
     */
    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
        return createConsumer(destination, null, messageListener);
    }

    /**
     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
     * using a message selector. Since <CODE> Queue</CODE> and
     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
     * can be used in the destination parameter to create a
     * <CODE>MessageConsumer</CODE>.
     * <P>
     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
     * that have been sent to a destination.
     *
     * @param destination the <CODE>Destination</CODE> to access
     * @param messageSelector only messages with properties matching the message
     *                selector expression are delivered. A value of null or an
     *                empty string indicates that there is no message selector
     *                for the message consumer.
     * @param messageListener the listener to use for async consumption of messages
     * @return the MessageConsumer
     * @throws JMSException if the session fails to create a MessageConsumer due
     *                 to some internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified.
     * @throws InvalidSelectorException if the message selector is invalid.
     * @since 1.1
     */
    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
        return createConsumer(destination, messageSelector, false, messageListener);
    }

    /**
     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
     * using a message selector. This method can specify whether messages
     * published by its own connection should be delivered to it, if the
     * destination is a topic.
     * <P>
     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
     * <CODE>Destination</CODE>, they can be used in the destination
     * parameter to create a <CODE>MessageConsumer</CODE>.
     * <P>
     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
     * that have been published to a destination.
     * <P>
     * In some cases, a connection may both publish and subscribe to a topic.
     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
     * inhibit the delivery of messages published by its own connection. The
     * default value for this attribute is False. The <CODE>noLocal</CODE>
     * value must be supported by destinations that are topics.
     *
     * @param destination the <CODE>Destination</CODE> to access
     * @param messageSelector only messages with properties matching the message
     *                selector expression are delivered. A value of null or an
     *                empty string indicates that there is no message selector
     *                for the message consumer.
     * @param noLocal - if true, and the destination is a topic, inhibits the
     *                delivery of messages published by its own connection. The
     *                behavior for <CODE>NoLocal</CODE> is not specified if
     *                the destination is a queue.
     * @return the MessageConsumer
     * @throws JMSException if the session fails to create a MessageConsumer due
     *                 to some internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified.
     * @throws InvalidSelectorException if the message selector is invalid.
     * @since 1.1
     */
    @Override
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        return createConsumer(destination, messageSelector, noLocal, null);
    }

    /**
     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
     * using a message selector. This method can specify whether messages
     * published by its own connection should be delivered to it, if the
     * destination is a topic.
     * <P>
     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
     * <CODE>Destination</CODE>, they can be used in the destination
     * parameter to create a <CODE>MessageConsumer</CODE>.
     * <P>
     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
     * that have been published to a destination.
     * <P>
     * In some cases, a connection may both publish and subscribe to a topic.
     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
     * inhibit the delivery of messages published by its own connection. The
     * default value for this attribute is False. The <CODE>noLocal</CODE>
     * value must be supported by destinations that are topics.
     *
     * @param destination the <CODE>Destination</CODE> to access
     * @param messageSelector only messages with properties matching the message
     *                selector expression are delivered. A value of null or an
     *                empty string indicates that there is no message selector
     *                for the message consumer.
     * @param noLocal - if true, and the destination is a topic, inhibits the
     *                delivery of messages published by its own connection. The
     *                behavior for <CODE>NoLocal</CODE> is not specified if
     *                the destination is a queue.
     * @param messageListener the listener to use for async consumption of messages
     * @return the MessageConsumer
     * @throws JMSException if the session fails to create a MessageConsumer due
     *                 to some internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified.
     * @throws InvalidSelectorException if the message selector is invalid.
     * @since 1.1
     */
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
        checkClosed();

        if (destination instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createConsumer(this, messageSelector, noLocal);
        }

        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
        int prefetch = 0;
        if (destination instanceof Topic) {
            prefetch = prefetchPolicy.getTopicPrefetch();
        } else {
            prefetch = prefetchPolicy.getQueuePrefetch();
        }
        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
    }

    /**
     * Creates a queue identity given a <CODE>Queue</CODE> name.
     * <P>
     * This facility is provided for the rare cases where clients need to
     * dynamically manipulate queue identity. It allows the creation of a queue
     * identity with a provider-specific name. Clients that depend on this
     * ability are not portable.
     * <P>
     * Note that this method is not for creating the physical queue. The
     * physical creation of queues is an administrative task and is not to be
     * initiated by the JMS API. The one exception is the creation of temporary
     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
     * method.
     *
     * @param queueName the name of this <CODE>Queue</CODE>
     * @return a <CODE>Queue</CODE> with the given name
     * @throws JMSException if the session fails to create a queue due to some
     *                 internal error.
     * @since 1.1
     */
    @Override
    public Queue createQueue(String queueName) throws JMSException {
        checkClosed();
        if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
            return new ActiveMQTempQueue(queueName);
        }
        return new ActiveMQQueue(queueName);
    }

    /**
     * Creates a topic identity given a <CODE>Topic</CODE> name.
     * <P>
     * This facility is provided for the rare cases where clients need to
     * dynamically manipulate topic identity. This allows the creation of a
     * topic identity with a provider-specific name. Clients that depend on this
     * ability are not portable.
     * <P>
     * Note that this method is not for creating the physical topic. The
     * physical creation of topics is an administrative task and is not to be
     * initiated by the JMS API. The one exception is the creation of temporary
     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
     * method.
     *
     * @param topicName the name of this <CODE>Topic</CODE>
     * @return a <CODE>Topic</CODE> with the given name
     * @throws JMSException if the session fails to create a topic due to some
     *                 internal error.
     * @since 1.1
     */
    @Override
    public Topic createTopic(String topicName) throws JMSException {
        checkClosed();
        if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
            return new ActiveMQTempTopic(topicName);
        }
        return new ActiveMQTopic(topicName);
    }
   
    @Override
    public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException {
        throw new UnsupportedOperationException("createSharedConsumer(Topic, sharedSubscriptionName) is not supported");
    }

    @Override
    public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException {
        throw new UnsupportedOperationException("createSharedConsumer(Topic, sharedSubscriptionName, messageSelector) is not supported");
    }

    @Override
    public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
        checkClosed();
        return createDurableSubscriber(topic, name, null, false);
    }

    @Override
    public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        checkClosed();
        return createDurableSubscriber(topic, name, messageSelector, noLocal);
    }

    @Override
    public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
        throw new UnsupportedOperationException("createSharedDurableConsumer(Topic, name) is not supported");
    }

    @Override
    public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException {
        throw new UnsupportedOperationException("createSharedDurableConsumer(Topic, name, messageSelector) is not supported");
    }

	/**
     * Creates a durable subscriber to the specified topic.
     * <P>
     * If a client needs to receive all the messages published on a topic,
     * including the ones published while the subscriber is inactive, it uses a
     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
     * record of this durable subscription and insures that all messages from
     * the topic's publishers are retained until they are acknowledged by this
     * durable subscriber or they have expired.
     * <P>
     * Sessions with durable subscribers must always provide the same client
     * identifier. In addition, each client must specify a name that uniquely
     * identifies (within client identifier) each durable subscription it
     * creates. Only one session at a time can have a
     * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
     * <P>
     * A client can change an existing durable subscription by creating a
     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
     * and/or message selector. Changing a durable subscriber is equivalent to
     * unsubscribing (deleting) the old one and creating a new one.
     * <P>
     * In some cases, a connection may both publish and subscribe to a topic.
     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
     * inhibit the delivery of messages published by its own connection. The
     * default value for this attribute is false.
     *
     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
     * @param name the name used to identify this subscription
     * @return the TopicSubscriber
     * @throws JMSException if the session fails to create a subscriber due to
     *                 some internal error.
     * @throws InvalidDestinationException if an invalid topic is specified.
     * @since 1.1
     */
    @Override
    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        checkClosed();
        return createDurableSubscriber(topic, name, null, false);
    }

    /**
     * Creates a durable subscriber to the specified topic, using a message
     * selector and specifying whether messages published by its own connection
     * should be delivered to it.
     * <P>
     * If a client needs to receive all the messages published on a topic,
     * including the ones published while the subscriber is inactive, it uses a
     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
     * record of this durable subscription and insures that all messages from
     * the topic's publishers are retained until they are acknowledged by this
     * durable subscriber or they have expired.
     * <P>
     * Sessions with durable subscribers must always provide the same client
     * identifier. In addition, each client must specify a name which uniquely
     * identifies (within client identifier) each durable subscription it
     * creates. Only one session at a time can have a
     * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
     * inactive durable subscriber is one that exists but does not currently
     * have a message consumer associated with it.
     * <P>
     * A client can change an existing durable subscription by creating a
     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
     * and/or message selector. Changing a durable subscriber is equivalent to
     * unsubscribing (deleting) the old one and creating a new one.
     *
     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
     * @param name the name used to identify this subscription
     * @param messageSelector only messages with properties matching the message
     *                selector expression are delivered. A value of null or an
     *                empty string indicates that there is no message selector
     *                for the message consumer.
     * @param noLocal if set, inhibits the delivery of messages published by its
     *                own connection
     * @return the Queue Browser
     * @throws JMSException if the session fails to create a subscriber due to
     *                 some internal error.
     * @throws InvalidDestinationException if an invalid topic is specified.
     * @throws InvalidSelectorException if the message selector is invalid.
     * @since 1.1
     */
    @Override
    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        checkClosed();

        if (topic == null) {
            throw new InvalidDestinationException("Topic cannot be null");
        }

        if (topic instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)topic;
            return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
        }

        connection.checkClientIDWasManuallySpecified();
        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
        int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
        int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
                                           noLocal, false, asyncDispatch);
    }

    /**
     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
     * the specified queue.
     *
     * @param queue the <CODE>queue</CODE> to access
     * @return the Queue Browser
     * @throws JMSException if the session fails to create a browser due to some
     *                 internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified
     * @since 1.1
     */
    @Override
    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        checkClosed();
        return createBrowser(queue, null);
    }

    /**
     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
     * the specified queue using a message selector.
     *
     * @param queue the <CODE>queue</CODE> to access
     * @param messageSelector only messages with properties matching the message
     *                selector expression are delivered. A value of null or an
     *                empty string indicates that there is no message selector
     *                for the message consumer.
     * @return the Queue Browser
     * @throws JMSException if the session fails to create a browser due to some
     *                 internal error.
     * @throws InvalidDestinationException if an invalid destination is
     *                 specified
     * @throws InvalidSelectorException if the message selector is invalid.
     * @since 1.1
     */
    @Override
    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
        checkClosed();
        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
    }

    /**
     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
     * of the <CODE>Connection</CODE> unless it is deleted earlier.
     *
     * @return a temporary queue identity
     * @throws JMSException if the session fails to create a temporary queue due
     *                 to some internal error.
     * @since 1.1
     */
    @Override
    public TemporaryQueue createTemporaryQueue() throws JMSException {
        checkClosed();
        return (TemporaryQueue)connection.createTempDestination(false);
    }

    /**
     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
     * of the <CODE>Connection</CODE> unless it is deleted earlier.
     *
     * @return a temporary topic identity
     * @throws JMSException if the session fails to create a temporary topic due
     *                 to some internal error.
     * @since 1.1
     */
    @Override
    public TemporaryTopic createTemporaryTopic() throws JMSException {
        checkClosed();
        return (TemporaryTopic)connection.createTempDestination(true);
    }

    /**
     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
     * the specified queue.
     *
     * @param queue the <CODE>Queue</CODE> to access
     * @return a new QueueBrowser instance.
     * @throws JMSException if the session fails to create a receiver due to
     *                 some internal error.
     * @throws JMSException
     * @throws InvalidDestinationException if an invalid queue is specified.
     */
    @Override
    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        checkClosed();
        return createReceiver(queue, null);
    }

    /**
     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
     * the specified queue using a message selector.
     *
     * @param queue the <CODE>Queue</CODE> to access
     * @param messageSelector only messages with properties matching the message
     *                selector expression are delivered. A value of null or an
     *                empty string indicates that there is no message selector
     *                for the message consumer.
     * @return QueueReceiver
     * @throws JMSException if the session fails to create a receiver due to
     *                 some internal error.
     * @throws InvalidDestinationException if an invalid queue is specified.
     * @throws InvalidSelectorException if the message selector is invalid.
     */
    @Override
    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
        checkClosed();

        if (queue instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)queue;
            return customDestination.createReceiver(this, messageSelector);
        }

        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
                                         prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
    }

    /**
     * Creates a <CODE>QueueSender</CODE> object to send messages to the
     * specified queue.
     *
     * @param queue the <CODE>Queue</CODE> to access, or null if this is an
     *                unidentified producer
     * @return QueueSender
     * @throws JMSException if the session fails to create a sender due to some
     *                 internal error.
     * @throws InvalidDestinationException if an invalid queue is specified.
     */
    @Override
    public QueueSender createSender(Queue queue) throws JMSException {
        checkClosed();
        if (queue instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)queue;
            return customDestination.createSender(this);
        }
        int timeSendOut = connection.getSendTimeout();
        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
    }

    /**
     * Creates a nondurable subscriber to the specified topic. <p/>
     * <P>
     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
     * that have been published to a topic. <p/>
     * <P>
     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
     * receive only messages that are published while they are active. <p/>
     * <P>
     * In some cases, a connection may both publish and subscribe to a topic.
     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
     * inhibit the delivery of messages published by its own connection. The
     * default value for this attribute is false.
     *
     * @param topic the <CODE>Topic</CODE> to subscribe to
     * @return TopicSubscriber
     * @throws JMSException if the session fails to create a subscriber due to
     *                 some internal error.
     * @throws InvalidDestinationException if an invalid topic is specified.
     */
    @Override
    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        checkClosed();
        return createSubscriber(topic, null, false);
    }

    /**
     * Creates a nondurable subscriber to the specified topic, using a message
     * selector or specifying whether messages published by its own connection
     * should be delivered to it. <p/>
     * <P>
     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
     * that have been published to a topic. <p/>
     * <P>
     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
     * receive only messages that are published while they are active. <p/>
     * <P>
     * Messages filtered out by a subscriber's message selector will never be
     * delivered to the subscriber. From the subscriber's perspective, they do
     * not exist. <p/>
     * <P>
     * In some cases, a connection may both publish and subscribe to a topic.
     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
     * inhibit the delivery of messages published by its own connection. The
     * default value for this attribute is false.
     *
     * @param topic the <CODE>Topic</CODE> to subscribe to
     * @param messageSelector only messages with properties matching the message
     *                selector expression are delivered. A value of null or an
     *                empty string indicates that there is no message selector
     *                for the message consumer.
     * @param noLocal if set, inhibits the delivery of messages published by its
     *                own connection
     * @return TopicSubscriber
     * @throws JMSException if the session fails to create a subscriber due to
     *                 some internal error.
     * @throws InvalidDestinationException if an invalid topic is specified.
     * @throws InvalidSelectorException if the message selector is invalid.
     */
    @Override
    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
        checkClosed();

        if (topic instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)topic;
            return customDestination.createSubscriber(this, messageSelector, noLocal);
        }

        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
    }

    /**
     * Creates a publisher for the specified topic. <p/>
     * <P>
     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
     * a topic, it defines a new sequence of messages that have no ordering
     * relationship with the messages it has previously sent.
     *
     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
     *                an unidentified producer
     * @return TopicPublisher
     * @throws JMSException if the session fails to create a publisher due to
     *                 some internal error.
     * @throws InvalidDestinationException if an invalid topic is specified.
     */
    @Override
    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        checkClosed();

        if (topic instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)topic;
            return customDestination.createPublisher(this);
        }
        int timeSendOut = connection.getSendTimeout();
        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
    }

    /**
     * Unsubscribes a durable subscription that has been created by a client.
     * <P>
     * This method deletes the state being maintained on behalf of the
     * subscriber by its provider.
     * <P>
     * It is erroneous for a client to delete a durable subscription while there
     * is an active <CODE>MessageConsumer </CODE> or
     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
     * message is part of a pending transaction or has not been acknowledged in
     * the session.
     *
     * @param name the name used to identify this subscription
     * @throws JMSException if the session fails to unsubscribe to the durable
     *                 subscription due to some internal error.
     * @throws InvalidDestinationException if an invalid subscription name is
     *                 specified.
     * @since 1.1
     */
    @Override
    public void unsubscribe(String name) throws JMSException {
        checkClosed();
        connection.unsubscribe(name);
    }

    @Override
    public void dispatch(MessageDispatch messageDispatch) {
        try {
            executor.execute(messageDispatch);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            connection.onClientInternalException(e);
        }
    }

    /**
     * Acknowledges all consumed messages of the session of this consumed
     * message.
     * <P>
     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
     * for use when a client has specified that its JMS session's consumed
     * messages are to be explicitly acknowledged. By invoking
     * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
     * all messages consumed by the session that the message was delivered to.
     * <P>
     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
     * sessions and sessions specified to use implicit acknowledgement modes.
     * <P>
     * A client may individually acknowledge each message as it is consumed, or
     * it may choose to acknowledge messages as an application-defined group
     * (which is done by calling acknowledge on the last received message of the
     * group, thereby acknowledging all messages consumed by the session.)
     * <P>
     * Messages that have been received but not acknowledged may be redelivered.
     *
     * @throws JMSException if the JMS provider fails to acknowledge the
     *                 messages due to some internal error.
     * @throws javax.jms.IllegalStateException if this method is called on a
     *                 closed session.
     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
     */
    public void acknowledge() throws JMSException {
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            c.acknowledge();
        }
    }

    /**
     * Add a message consumer.
     *
     * @param consumer - message consumer.
     * @throws JMSException
     */
    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
        this.consumers.add(consumer);
        if (consumer.isDurableSubscriber()) {
            stats.onCreateDurableSubscriber();
        }
        this.connection.addDispatcher(consumer.getConsumerId(), this);
    }

    /**
     * Remove the message consumer.
     *
     * @param consumer - consumer to be removed.
     * @throws JMSException
     */
    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
        this.connection.removeDispatcher(consumer.getConsumerId());
        if (consumer.isDurableSubscriber()) {
            stats.onRemoveDurableSubscriber();
        }
        this.consumers.remove(consumer);
        this.connection.removeDispatcher(consumer);
    }

    /**
     * Adds a message producer.
     *
     * @param producer - message producer to be added.
     * @throws JMSException
     */
    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
        this.producers.add(producer);
        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
    }

    /**
     * Removes a message producer.
     *
     * @param producer - message producer to be removed.
     * @throws JMSException
     */
    protected void removeProducer(ActiveMQMessageProducer producer) {
        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
        this.producers.remove(producer);
    }

    /**
     * Start this Session.
     *
     * @throws JMSException
     */
    protected void start() throws JMSException {
        started.set(true);
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            c.start();
        }
        executor.start();
    }

    /**
     * Stops this session.
     *
     * @throws JMSException
     */
    protected void stop() throws JMSException {

        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            c.stop();
        }

        started.set(false);
        executor.stop();
    }

    /**
     * Returns the session id.
     *
     * @return value - session id.
     */
    protected SessionId getSessionId() {
        return info.getSessionId();
    }

    /**
     * @return a unique ConsumerId instance.
     */
    protected ConsumerId getNextConsumerId() {
        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
    }

    /**
     * @return a unique ProducerId instance.
     */
    protected ProducerId getNextProducerId() {
        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
    }

    /**
     * Sends the message for dispatch by the broker.
     *
     * @param producer - message producer.
     * @param destination - message destination.
     * @param message - message to be sent.
     * @param deliveryMode - JMS message delivery mode.
     * @param priority - message priority.
     * @param timeToLive - message expiration.
     * @param producerWindow
     * @param onComplete
     * @throws JMSException
     */
    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
        send(producer, destination, message, deliveryMode, priority, timeToLive, producer.getDisableMessageID(), producer.getDisableMessageID(), producerWindow, sendTimeout, onComplete);
    }

    /**
     * Sends the message for dispatch by the broker.
     *
     * @param producer - message producer.
     * @param destination - message destination.
     * @param message - message to be sent.
     * @param deliveryMode - JMS message delivery mode.
     * @param priority - message priority.
     * @param timeToLive - message expiration.
     * @param disableTimestamp - disable timestamp.
     * @param disableMessageID - optionally, disable messageID.
     * @param producerWindow
     * @param onComplete
     * @throws JMSException
     */
    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                        boolean disableMessageID, boolean disableMessageTimestamp, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {

        checkClosed();
        if (destination.isTemporary() && connection.isDeleted(destination)) {
            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
        }
        synchronized (sendMutex) {
            // tell the Broker we are about to start a new transaction
            doStartTransaction();
            if (transactionContext.isRollbackOnly()) {
                throw new IllegalStateException("transaction marked rollback only");
            }
            TransactionId txid = transactionContext.getTransactionId();
            long sequenceNumber = producer.getMessageSequence();

            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
            message.setJMSDeliveryMode(deliveryMode);
            long expiration = 0L;
            long timeStamp = System.currentTimeMillis();
            if (timeToLive > 0) {
                expiration = timeToLive + timeStamp;
            }

            // TODO: AMQ-8500 - update this when openwire supports JMSDeliveryTime
            // ref: ActiveMQMessageTransformation#copyProperties
            if(!(message instanceof ActiveMQMessage)) {
                setForeignMessageDeliveryTime(message, timeStamp);
            } else {
                message.setJMSDeliveryTime(timeStamp);
            }
            if (!disableMessageTimestamp && !producer.getDisableMessageTimestamp()) {
                message.setJMSTimestamp(timeStamp);
            } else {
                message.setJMSTimestamp(0l);
            }
            message.setJMSExpiration(expiration);
            message.setJMSPriority(priority);
            message.setJMSRedelivered(false);

            // transform to our own message format here
            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
            msg.setDestination(destination);
            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));

            // Set the message id.
            if (msg != message) {
                message.setJMSMessageID(msg.getMessageId().toString());
                // Make sure the JMS destination is set on the foreign messages too.
                message.setJMSDestination(destination);
            }
            //clear the brokerPath in case we are re-sending this message
            msg.setBrokerPath(null);

            msg.setTransactionId(txid);
            if (connection.isCopyMessageOnSend()) {
                msg = (ActiveMQMessage)msg.copy();
            }
            msg.setConnection(connection);
            msg.onSend();
            msg.setProducerId(msg.getMessageId().getProducerId());
            if (LOG.isTraceEnabled()) {
                LOG.trace(getSessionId() + " sending message: " + msg);
            }
            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                this.connection.asyncSendPacket(msg);
                if (producerWindow != null) {
                    // Since we defer lots of the marshaling till we hit the
                    // wire, this might not
                    // provide and accurate size. We may change over to doing
                    // more aggressive marshaling,
                    // to get more accurate sizes.. this is more important once
                    // users start using producer window
                    // flow control.
                    int size = msg.getSize();
                    producerWindow.increaseUsage(size);
                }
            } else {
                if (sendTimeout > 0 && onComplete==null) {
                    this.connection.syncSendPacket(msg,sendTimeout);
                }else {
                    this.connection.syncSendPacket(msg, onComplete);
                }
            }

        }
    }

    /**
     * Send TransactionInfo to indicate transaction has started
     *
     * @throws JMSException if some internal error occurs
     */
    protected void doStartTransaction() throws JMSException {
        if (getTransacted() && !transactionContext.isInXATransaction()) {
            transactionContext.begin();
        }
    }

    /**
     * Checks whether the session has unconsumed messages.
     *
     * @return true - if there are unconsumed messages.
     */
    public boolean hasUncomsumedMessages() {
        return executor.hasUncomsumedMessages();
    }

    /**
     * Checks whether the session uses transactions.
     *
     * @return true - if the session uses transactions.
     */
    public boolean isTransacted() {
        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
    }

    /**
     * Checks whether the session used client acknowledgment.
     *
     * @return true - if the session uses client acknowledgment.
     */
    protected boolean isClientAcknowledge() {
        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
    }

    /**
     * Checks whether the session used auto acknowledgment.
     *
     * @return true - if the session uses client acknowledgment.
     */
    public boolean isAutoAcknowledge() {
        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
    }

    /**
     * Checks whether the session used dup ok acknowledgment.
     *
     * @return true - if the session uses client acknowledgment.
     */
    public boolean isDupsOkAcknowledge() {
        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
    }

    public boolean isIndividualAcknowledge(){
        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
    }

    /**
     * Returns the message delivery listener.
     *
     * @return deliveryListener - message delivery listener.
     */
    public DeliveryListener getDeliveryListener() {
        return deliveryListener;
    }

    /**
     * Sets the message delivery listener.
     *
     * @param deliveryListener - message delivery listener.
     */
    public void setDeliveryListener(DeliveryListener deliveryListener) {
        this.deliveryListener = deliveryListener;
    }

    /**
     * Returns the SessionInfo bean.
     *
     * @return info - SessionInfo bean.
     * @throws JMSException
     */
    protected SessionInfo getSessionInfo() throws JMSException {
        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
        return info;
    }

    /**
     * Send the asynchronous command.
     *
     * @param command - command to be executed.
     * @throws JMSException
     */
    public void asyncSendPacket(Command command) throws JMSException {
        connection.asyncSendPacket(command);
    }

    /**
     * Send the synchronous command.
     *
     * @param command - command to be executed.
     * @return Response
     * @throws JMSException
     */
    public Response syncSendPacket(Command command) throws JMSException {
        return connection.syncSendPacket(command);
    }

    public long getNextDeliveryId() {
        return deliveryIdGenerator.getNextSequenceId();
    }

    public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {

        List<MessageDispatch> c = unconsumedMessages.removeAll();
        Collections.reverse(c);

        for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
            MessageDispatch md = iter.next();
            executor.executeFirst(md);
        }

    }

    public boolean isRunning() {
        return started.get();
    }

    public boolean isAsyncDispatch() {
        return asyncDispatch;
    }

    public void setAsyncDispatch(boolean asyncDispatch) {
        this.asyncDispatch = asyncDispatch;
    }

    /**
     * @return Returns the sessionAsyncDispatch.
     */
    public boolean isSessionAsyncDispatch() {
        return sessionAsyncDispatch;
    }

    /**
     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
     */
    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
        this.sessionAsyncDispatch = sessionAsyncDispatch;
    }

    public MessageTransformer getTransformer() {
        return transformer;
    }

    public ActiveMQConnection getConnection() {
        return connection;
    }

    /**
     * Sets the transformer used to transform messages before they are sent on
     * to the JMS bus or when they are received from the bus but before they are
     * delivered to the JMS client
     */
    public void setTransformer(MessageTransformer transformer) {
        this.transformer = transformer;
    }

    public BlobTransferPolicy getBlobTransferPolicy() {
        return blobTransferPolicy;
    }

    /**
     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
     * OBjects) are transferred from producers to brokers to consumers
     */
    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
        this.blobTransferPolicy = blobTransferPolicy;
    }

    public List<MessageDispatch> getUnconsumedMessages() {
        return executor.getUnconsumedMessages();
    }

    @Override
    public String toString() {
        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + ",closed=" + closed + "} " + sendMutex;
    }

    public void checkMessageListener() throws JMSException {
        if (messageListener != null) {
            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
        }
        for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
            ActiveMQMessageConsumer consumer = i.next();
            if (consumer.hasMessageListener()) {
                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
            }
        }
    }

    protected void setOptimizeAcknowledge(boolean value) {
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            c.setOptimizeAcknowledge(value);
        }
    }

    protected void setPrefetchSize(ConsumerId id, int prefetch) {
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            if (c.getConsumerId().equals(id)) {
                c.setPrefetchSize(prefetch);
                break;
            }
        }
    }

    protected void close(ConsumerId id) {
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            if (c.getConsumerId().equals(id)) {
                try {
                    c.close();
                } catch (JMSException e) {
                    LOG.warn("Exception closing consumer", e);
                }
                LOG.warn("Closed consumer on Command, " + id);
                break;
            }
        }
    }

    public boolean isInUse(ActiveMQTempDestination destination) {
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            if (c.isInUse(destination)) {
                return true;
            }
        }
        return false;
    }

    /**
     * highest sequence id of the last message delivered by this session.
     * Passed to the broker in the close command, maintained by dispose()
     * @return lastDeliveredSequenceId
     */
    public long getLastDeliveredSequenceId() {
        return lastDeliveredSequenceId;
    }

    protected void sendAck(MessageAck ack) throws JMSException {
        sendAck(ack,false);
    }

    protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
            asyncSendPacket(ack);
        } else {
            syncSendPacket(ack);
        }
    }

    protected Scheduler getScheduler() throws JMSException {
        return this.connection.getScheduler();
    }

    protected ThreadPoolExecutor getConnectionExecutor() {
        return this.connectionExecutor;
    }

    private static void setForeignMessageDeliveryTime(final Message foreignMessage, final long deliveryTime) throws JMSException {
        // Check for JMS v2 message via presence of setJMSDeliveryTime
        Method deliveryTimeMethod = null;
        try {
            Class<?> clazz = foreignMessage.getClass();
            Method method = clazz.getMethod("setJMSDeliveryTime", new Class[] { long.class });
            if (!Modifier.isAbstract(method.getModifiers())) {
                deliveryTimeMethod = method;
            }
        } catch (NoSuchMethodException e) {
            // non-JMS v2 message
        }

        if (deliveryTimeMethod != null) {
            foreignMessage.setJMSDeliveryTime(deliveryTime);
        }
    }
}
