/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client;

import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.jms.*;
import javax.jms.IllegalStateException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQPEncodedListMessage;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.JMSObjectMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageEncryptionHelper;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.ListMessage;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Strings;

/*
 * TODO  Different FailoverSupport implementation are needed on the same method call, in different situations. For
 * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
 * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
 * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
 * has been reestablished. All fail-over protected operations should be placed in private methods, with
 * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
 * fail-over process sets a nowait flag and uses an async method call instead.
 * TODO  Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
 * after looking at worse bottlenecks first.
 */
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
    /** Used for debugging. */
    private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);

    /** System property to configure dispatcher shutdown timeout in milliseconds. */
    public static final String DISPATCHER_SHUTDOWN_TIMEOUT_MS = "DISPATCHER_SHUTDOWN_TIMEOUT_MS";
    /** Dispatcher shutdown timeout default setting. */
    public static final String DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT = "1000";

    /** System property to enable strict AMQP compliance. */
    public static final String STRICT_AMQP = "STRICT_AMQP";

    /** Strict AMQP default setting. */
    public static final String STRICT_AMQP_DEFAULT = "false";

    /** System property to enable failure if strict AMQP compliance is violated. */
    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";

    /** Strict AMQP failure default. */
    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";

    /** System property to enable immediate message prefetching. */
    public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";

    /** Immediate message prefetch default. */
    public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";

    private final boolean _declareQueues =
        Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true"));

    private final boolean _declareExchanges =
        Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true"));

    private final boolean _bindQueues =
            Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "true"));

    private final boolean _useAMQPEncodedMapMessage;

    private final boolean _useAMQPEncodedStreamMessage;

    /**
     * Flag indicating to start dispatcher as a daemon thread
     */
    protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);

    private final Map<AMQDestination, WeakReference<AMQDestination>>
            _resolvedDestinations = Collections.synchronizedMap(new WeakHashMap<AMQDestination, WeakReference<AMQDestination>> ());

    private final long _dispatcherShutdownTimeoutMs;

    /** The connection to which this session belongs. */
    private AMQConnection _connection;

    /** Used to indicate whether or not this is a transactional session. */
    private final boolean _transacted;

    /** Holds the sessions acknowledgement mode. */
    private final int _acknowledgeMode;

    /** Holds this session unique identifier, used to distinguish it from other sessions. */
    private int _channelId;

    private int _ticket;

    /** Holds the high mark for prefetched message, at which the session is suspended. */
    private final int _prefetchHighMark;

    /** Holds the low mark for prefetched messages, below which the session is resumed. */
    private final int _prefetchLowMark;

    /** Holds the message listener, if any, which is attached to this session. */
    private MessageListener _messageListener = null;

    /** Used to indicate that this session has been started at least once. */
    private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);

    private final ConcurrentMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
            new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();

    private final ConcurrentMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();

    private final Lock _subscriberDetails = new ReentrantLock(true);
    private final Lock _subscriberAccess = new ReentrantLock(true);

    private final FlowControllingBlockingQueue<Dispatchable> _queue;

    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
    private final AtomicLong _rollbackMark = new AtomicLong(-1);

    private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();

    private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();

    private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();

    private volatile Dispatcher _dispatcher;

    private volatile Thread _dispatcherThread;

    private MessageFactoryRegistry _messageFactoryRegistry;

    /** Holds all of the producers created by this session, keyed by their unique identifiers. */
    private final Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();

    /**
     * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
     * methods.
     */
    private int _nextTag = 1;

    private final Map<String,C> _consumers = new ConcurrentHashMap<>();

    /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
    private ConcurrentMap<Destination, AtomicInteger> _destinationConsumerCount =
            new ConcurrentHashMap<Destination, AtomicInteger>();

    /**
     * Used as a source of unique identifiers for producers within the session.
     *
     * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one
     * thread of control is allowed to create producers for any given session instance.
     */
    private long _nextProducerId;

    /**
     * Set when recover is called. This is to handle the case where recover() is called by application code during
     * onMessage() processing to ensure that an auto ack is not sent.
     */
    private volatile boolean _sessionInRecovery;

    private volatile boolean _usingDispatcherForCleanup;

    /** Used to indicates that the connection to which this session belongs, has been stopped. */
    private final AtomicBoolean _connectionStopped = new AtomicBoolean();

    /** Used to indicate that this session has a message listener attached to it. */
    private boolean _hasMessageListeners;

    /** Used to indicate that this session has been suspended. */
    private boolean _suspended;

    /**
     * Used to protect the suspension of this session, so that critical code can be executed during suspension,
     * without the session being resumed by other threads.
     */
    private final Object _suspensionLock = new Object();

    private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);

    private final boolean _immediatePrefetch;

    private final boolean _strictAMQP;

    private final boolean _strictAMQPFATAL;
    private final Lock _messageDeliveryLock = new ReentrantLock(true);

    /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
    private boolean _dirty;
    /** Has failover occured on this session with outstanding actions to commit? */
    private boolean _failedOverDirty;

    private MessageEncryptionHelper _messageEncryptionHelper;

    /** Holds the highest received delivery tag. */
    protected AtomicLong getHighestDeliveryTag()
    {
        return _highestDeliveryTag;
    }

    /** Pre-fetched message tags */
    protected ConcurrentLinkedQueue<Long> getPrefetchedMessageTags()
    {
        return _prefetchedMessageTags;
    }

    /** All the not yet acknowledged message tags */
    protected ConcurrentLinkedQueue<Long> getUnacknowledgedMessageTags()
    {
        return _unacknowledgedMessageTags;
    }

    /** All the delivered message tags */
    protected ConcurrentLinkedQueue<Long> getDeliveredMessageTags()
    {
        return _deliveredMessageTags;
    }

    /** Holds the dispatcher thread for this session. */
    protected Dispatcher getDispatcher()
    {
        return _dispatcher;
    }

    protected Thread getDispatcherThread()
    {
        return _dispatcherThread;
    }

    /** Holds the message factory factory for this session. */
    protected MessageFactoryRegistry getMessageFactoryRegistry()
    {
        return _messageFactoryRegistry;
    }

    private final ExecutorService _flowControlNoAckTaskPool;

    /**
     * Consumers associated with this session
     */
    protected Collection<C> getConsumers()
    {
        return new ArrayList<>(_consumers.values());
    }

    protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
    {
        _usingDispatcherForCleanup = usingDispatcherForCleanup;
    }

    /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
    protected boolean isImmediatePrefetch()
    {
        return _immediatePrefetch;
    }

    abstract void handleNodeDelete(final AMQDestination dest) throws QpidException;

    abstract void handleLinkDelete(final AMQDestination dest) throws QpidException;

    /**
     * Creates a new session on a connection.
     * @param con                     The connection on which to create the session.
     * @param channelId               The unique identifier for the session.
     * @param transacted              Indicates whether or not the session is transactional.
     * @param acknowledgeMode         The acknowledgement mode for the session.
     * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
     * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
     */
    protected AMQSession(AMQConnection con,
                         int channelId,
                         boolean transacted,
                         int acknowledgeMode,
                         int defaultPrefetchHighMark,
                         int defaultPrefetchLowMark)
    {
        _useAMQPEncodedMapMessage = con == null || !con.isUseLegacyMapMessageFormat();
        _useAMQPEncodedStreamMessage = con != null && !con.isUseLegacyStreamMessageFormat();
        _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
        _strictAMQPFATAL =
                Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
        _immediatePrefetch =
                _strictAMQP
                || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
        _dispatcherShutdownTimeoutMs = Integer.parseInt(System.getProperty(DISPATCHER_SHUTDOWN_TIMEOUT_MS, DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT));

        _connection = con;
        _transacted = transacted;
        if (transacted)
        {
            _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
        }
        else
        {
            _acknowledgeMode = acknowledgeMode;
        }
        _messageEncryptionHelper = new MessageEncryptionHelper(this);
        _channelId = channelId;
        _messageFactoryRegistry = MessageFactoryRegistry.newDefaultRegistry(this);

        if (_acknowledgeMode == NO_ACKNOWLEDGE)
        {
            _prefetchHighMark = defaultPrefetchHighMark;
            _prefetchLowMark = defaultPrefetchLowMark == defaultPrefetchHighMark && defaultPrefetchHighMark > 0
                   ? Math.max(defaultPrefetchHighMark / 2, 1)
                    : defaultPrefetchLowMark;

            // we coalesce suspend jobs using single threaded pool executor with queue length of one
            // and discarding policy
            _flowControlNoAckTaskPool = new ThreadPoolExecutor(1, 1,
                                                               0L, TimeUnit.MILLISECONDS,
                                                               new LinkedBlockingQueue<Runnable>(1),
                                                               new ThreadFactory()
            {
                @Override
                public Thread newThread(final Runnable r)
                {
                    Thread thread = new Thread(r, "Connection_" + _connection.getConnectionNumber() + "_session_" + _channelId);
                    if (!thread.isDaemon())
                    {
                        thread.setDaemon(true);
                    }

                    return thread;
                }
            }, new ThreadPoolExecutor.DiscardPolicy());

            final FlowControllingBlockingQueue.ThresholdListener listener =
                    new FlowControllingBlockingQueue.ThresholdListener()
                    {
                        private final AtomicBoolean _suspendState = new AtomicBoolean();

                        public void aboveThreshold(int currentValue)
                        {
                            if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                            {
                                // Only execute change if previous state was false
                                if (!_suspendState.getAndSet(true))
                                {
                                    _logger.debug(
                                            "Above threshold ({}) so suspending channel. Current value is {}",
                                            _prefetchHighMark,
                                            currentValue);

                                    doSuspend();
                                }
                            }
                        }

                        public void underThreshold(int currentValue)
                        {
                            if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                            {
                                // Only execute change if previous state was true
                                if (_suspendState.getAndSet(false))
                                {
                                    _logger.debug(
                                            "Below threshold ({}) so unsuspending channel. Current value is {}",
                                            _prefetchLowMark,
                                            currentValue);
                                    doSuspend();
                                }
                            }
                        }

                        private void doSuspend()
                        {
                            _flowControlNoAckTaskPool.execute(new SuspenderRunner(_suspendState));
                        }
                    };
            _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, _prefetchLowMark, listener);
        }
        else
        {
            _prefetchHighMark = defaultPrefetchHighMark;
            _prefetchLowMark = defaultPrefetchLowMark;
            _flowControlNoAckTaskPool = null;
            _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, null);
        }

        // Add creation logging to tie in with the existing close logging
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Created session:" + this);
        }
    }

    // ===== JMS Session methods.

    /**
     * Closes the session with no timeout.
     *
     * @throws JMSException If the JMS provider fails to close the session due to some internal error.
     */
    @Override
    public void close() throws JMSException
    {
        close(-1);
    }

    public abstract QpidException getLastException();

    public void checkNotClosed() throws JMSException
    {
        try
        {
            super.checkNotClosed();
        }
        catch (IllegalStateException ise)
        {
            QpidException ex = getLastException();
            if (ex != null)
            {
                int code = 0;

                if (ex instanceof AMQException)
                {
                    code = ((AMQException) ex).getErrorCode();
                }

                throw JMSExceptionHelper.chainJMSException(new IllegalStateException("Session has been closed",
                                                                                     code == 0 ? null : Integer.toString(code)), ex);
            }
            else
            {
                throw ise;
            }
        }
    }

    public BytesMessage createBytesMessage() throws JMSException
    {
        checkNotClosed();
        JMSBytesMessage msg = new JMSBytesMessage(getMessageDelegateFactory());
        msg.setAMQSession(this);
        return msg;
    }

    /**
     * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
     *
     * @throws IllegalStateException If the session is closed.
     * @throws JMSException if there is a problem during acknowledge process.
     */
    public void acknowledge() throws IllegalStateException, JMSException
    {
        if (isClosed())
        {
            throw new IllegalStateException("Session is already closed");
        }
        else if (hasFailedOverDirty())
        {
            //perform an implicit recover in this scenario
            recover();

            //notify the consumer
            throw new IllegalStateException("has failed over");
        }

        try
        {
            acknowledgeImpl();
            markClean();
        }
        catch (TransportException e)
        {
            throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
        }
    }

    public void setLegacyFieldsForQueueType(AMQDestination dest)
    {
        // legacy support
        dest.setQueueName(dest.getAddressName());
        dest.setExchangeName("");
        dest.setExchangeClass("");
        dest.setRoutingKey(dest.getAMQQueueName());
    }

    public void setLegacyFieldsForTopicType(AMQDestination dest)
    {
        // legacy support
        dest.setExchangeName(dest.getAddressName());
        Node node = dest.getNode();
        dest.setExchangeClass(node.getExchangeType() == null
                                      ? ExchangeDefaults.TOPIC_EXCHANGE_CLASS
                                      : node.getExchangeType());
        dest.setRoutingKey(dest.getSubject());
    }

    protected void verifySubject(AMQDestination dest) throws QpidException
    {
        if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
        {

            if ("topic".equals(dest.getExchangeClass()))
            {
                dest.setRoutingKey("#");
                dest.setSubject(dest.getRoutingKey());
            }
            else
            {
                dest.setRoutingKey("");
                dest.setSubject("");
            }
        }
    }

    public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws QpidException;

    public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws QpidException;

    /**
     * 1. Try to resolve the address type (queue or exchange)
     * 2. if type == queue,
     *       2.1 verify queue exists or create if create == true
     *       2.2 If not throw exception
     *
     * 3. if type == exchange,
     *       3.1 verify exchange exists or create if create == true
     *       3.2 if not throw exception
     *       3.3 if exchange exists (or created) create subscription queue.
     */

    @SuppressWarnings("deprecation")
    public void resolveAddress(AMQDestination dest,
                                              boolean isConsumer,
                                              boolean noLocal) throws QpidException
    {
        if (isResolved(dest))
        {
            return;
        }
        else
        {
            boolean assertNode = (dest.getAssert() == AMQDestination.AddressOption.ALWAYS) ||
                                 (isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER) ||
                                 (!isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER);

            boolean createNode = (dest.getCreate() == AMQDestination.AddressOption.ALWAYS) ||
                                 (isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER) ||
                                 (!isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER);


            int suppliedType = dest.getNode() == null ? AMQDestination.UNKNOWN_TYPE : dest.getNode().getType();
            int type = resolveAddressType(dest);
            switch (type)
            {
                case AMQDestination.QUEUE_TYPE:

                    setLegacyFieldsForQueueType(dest);
                    if(createNode)
                    {
                        handleQueueNodeCreation(dest,noLocal);
                        break;
                    }
                    else if (isQueueExist(dest,assertNode) || suppliedType == AMQDestination.QUEUE_TYPE)
                    {
                        break;
                    }
                case AMQDestination.TOPIC_TYPE:
                    if(suppliedType != AMQDestination.QUEUE_TYPE)
                    {
                        setLegacyFieldsForTopicType(dest);
                        if (createNode)
                        {
                            verifySubject(dest);
                            handleExchangeNodeCreation(dest);
                            break;
                        }
                        else if (isExchangeExist(dest, assertNode) || suppliedType == AMQDestination.TOPIC_TYPE)
                        {
                            verifySubject(dest);
                            break;
                        }
                    }
                default:
                    throw new QpidException(
                            "The name '" + dest.getAddressName() +
                            "' supplied in the address doesn't resolve to an exchange or a queue");
            }
            setResolved(dest);
        }
    }

    void setResolved(final AMQDestination dest)
    {
        _resolvedDestinations.put(dest, new WeakReference<>(dest));
    }

    void setUnresolved(final AMQDestination dest)
    {
        _resolvedDestinations.remove(dest);
    }

    private void clearResolvedDestinations()
    {
        _resolvedDestinations.clear();
    }

    boolean isResolved(final AMQDestination dest)
    {
        final WeakReference<AMQDestination> resolvedDestRef = _resolvedDestinations.get(dest);
        final AMQDestination resolvedDest = resolvedDestRef == null ? null : resolvedDestRef.get();
        if (resolvedDest == dest)
        {
            return true;
        }
        else if (resolvedDest == null)
        {
            return false;
        }

        // verify legacy fields are equal
        return Objects.equals(dest.getQueueName(), resolvedDest.getQueueName()) &&
               Objects.equals(dest.getExchangeName(), resolvedDest.getExchangeName()) &&
               Objects.equals(dest.getExchangeClass(), resolvedDest.getExchangeClass()) &&
               Objects.equals(dest.getRoutingKey(), resolvedDest.getRoutingKey()) &&
               Objects.equals(dest.getSubject(), resolvedDest.getSubject());
    }

    public abstract int resolveAddressType(AMQDestination dest) throws QpidException;

    protected abstract void acknowledgeImpl() throws JMSException;

    /**
     * Acknowledge one or many messages.
     *
     * @param deliveryTag The tag of the last message to be acknowledged.
     * @param multiple    <tt>true</tt> to acknowledge all messages up to and including the one specified by the
     *                    delivery tag, <tt>false</tt> to just acknowledge that message.
     *
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);

    /**
     * Binds the named queue, with the specified routing key, to the named exchange.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     * @param queueName    The name of the queue to bind.
     * @param routingKey   The routing key to bind the queue with.
     * @param arguments    Additional arguments.
     * @param exchangeName The exchange to bind the queue on.
     *
     * @throws QpidException If the queue cannot be bound for any reason.
     * TODO  Be aware of possible changes to parameter order as versions change.
     * TODO  Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
     */
    public void bindQueue(final String queueName, final String routingKey, final Map<String,Object> arguments,
                          final String exchangeName, final AMQDestination destination) throws QpidException
    {
        bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
    }

    public void bindQueue(final String queueName, final String routingKey, final Map<String,Object> arguments,
                          final String exchangeName, final AMQDestination destination,
                          final boolean nowait) throws QpidException
    {
        /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
        new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>()
        {
            public Object execute() throws QpidException, FailoverException
            {
                sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait);
                return null;
            }
        }, _connection).execute();
    }

    public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws QpidException
    {
        if (consumer.getQueuename() != null)
        {
            bindQueue(consumer.getQueuename(), routingKey, new HashMap<String, Object>(), amqd.getExchangeName(), amqd);
        }
    }

    public abstract void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments,
                                       final String exchangeName, AMQDestination destination,
                                       final boolean nowait) throws QpidException, FailoverException;

    /**
     * Closes the session.
     * <p>
     * Note that this operation succeeds automatically if a fail-over interrupts the synchronous request to close
     * the channel. This is because the channel is marked as closed before the request to close it is made, so the
     * fail-over should not re-open it.
     *
     * @param timeout The timeout in milliseconds to wait for the session close acknowledgement from the broker.
     *
     * @throws JMSException If the JMS provider fails to close the session due to some internal error.
     * TODO  Be aware of possible changes to parameter order as versions change.
     * TODO Not certain about the logic of ignoring the failover exception, because the channel won't be
     * re-opened. May need to examine this more carefully.
     * TODO  Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
     * because the failover process sends the failover event before acquiring the mutex itself.
     */
    public void close(long timeout) throws JMSException
    {
        setClosing(true);
        lockMessageDelivery();
        try
        {
            // We must close down all producers and consumers in an orderly fashion. This is the only method
            // that can be called from a different thread of control from the one controlling the session.
            synchronized (getFailoverMutex())
            {
                close(timeout, true);
            }
        }
        finally
        {
            unlockMessageDelivery();
        }
    }

    private void close(long timeout, boolean sendClose) throws JMSException
    {
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Closing session: " + this);
        }

        // Ensure we only try and close an open session.
        if (!setClosed())
        {
            setClosing(true);
            // we pass null since this is not an error case
            closeProducersAndConsumers(null);

            try
            {
                // If the connection is open or we are in the process
                // of closing the connection then send a cance
                // no point otherwise as the connection will be gone
                if (!_connection.isClosed() || _connection.isClosing())
                {
                    if (sendClose)
                    {
                        sendClose(timeout);
                    }
                }
            }
            catch (QpidException e)
            {
                throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing session: " + e), e);
            }
            // This is ignored because the channel is already marked as closed so the fail-over process will
            // not re-open it.
            catch (FailoverException e)
            {
                _logger.debug(
                        "Got FailoverException during channel close, ignored as channel already marked as closed.");
            }
            catch (TransportException e)
            {
                throw toJMSException("Error closing session:" + e.getMessage(), e);
            }
            finally
            {
                shutdownFlowControlNoAckTaskPool();
                _connection.deregisterSession(_channelId);
            }
        }
    }

    public abstract void sendClose(long timeout) throws QpidException, FailoverException;

    /**
     * Called when the server initiates the closure of the session unilaterally.
     *
     * @param e the exception that caused this session to be closed. Null causes the
     */
    public void closed(Throwable e) throws JMSException
    {
        // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
        // calls through connection.closeAllSessions which is also called by the public connection.close()
        // with a null cause
        // When we are closing the Session due to a protocol session error we simply create a new AMQException
        // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
        // We need to determine here if the connection should be

        if (e instanceof AMQDisconnectedException)
        {
            // Failover failed and ain't coming back. Knife the dispatcher.
            stopDispatcherThread();

       }

        //if we don't have an exception then we can perform closing operations
        setClosing(e == null);

        if (!setClosed())
        {
            // An AMQException has an error code and message already and will be passed in when closure occurs as a
            // result of a channel close request
            QpidException amqe;
            if (e instanceof QpidException)
            {
                amqe = (QpidException) e;
            }
            else
            {
                amqe = new QpidException("Closing session forcibly", e);
            }

            _connection.deregisterSession(_channelId);
            closeProducersAndConsumers(amqe);
            shutdownFlowControlNoAckTaskPool();
        }

    }

    protected void stopDispatcherThread()
    {
        if (_dispatcherThread != null)
        {
            _dispatcherThread.interrupt();
        }
    }
    /**
     * Commits all messages done in this transaction and releases any locks currently held.
     * <p>
     * If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the
     * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
     * The client will be unable to determine whether or not the commit actually happened on the broker in this case.
     *
     * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
     *                      not mean that the commit is known to have failed, merely that it is not known whether it
     *                      failed or not.
     */
    public void commit() throws JMSException
    {
        checkTransacted();

        //Check that we are clean to commit.
        if (_failedOverDirty)
        {
            if (_logger.isDebugEnabled())
            {
                _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
            }
            rollback();

            throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
                                                     "The session transaction was rolled back.");
        }

        try
        {
            commitImpl();
            markClean();
        }
        catch (QpidException e)
        {
            throw toJMSException("Exception during commit: " + e.getMessage() + (e.getCause() == null ? "" : " (Caused by : " + e.getCause() + ")"), e);
        }
        catch (FailoverException e)
        {
            throw JMSExceptionHelper.chainJMSException(new JMSException(
                    "Fail-over interrupted commit. Status of the commit is uncertain."), e);
        }
        catch(TransportException e)
        {
            throw toJMSException("Session exception occurred while trying to commit: " + e.getMessage(), e);
        }
    }

    protected abstract void commitImpl() throws QpidException, FailoverException, TransportException;




    public void confirmConsumerCancelled(String consumerTag)
    {
        C consumer = _consumers.get(consumerTag);
        if (consumer != null)
        {
            if (!consumer.isBrowseOnly())  // Normal Consumer
            {
                // Clean the Maps up first
                // Flush any pending messages for this consumerTag
                if (_dispatcher != null)
                {
                    _logger.debug("Dispatcher is not null");
                }
                else
                {
                    _logger.debug("Dispatcher is null so created stopped dispatcher");
                    startDispatcherIfNecessary(true);
                }

                rejectPending(consumer);
            }
            else // Queue Browser
            {
                // Just close the consumer
                // fixme  the CancelOK is being processed before the arriving messages..
                // The dispatcher is still to process them so the server sent in order but the client
                // has yet to receive before the close comes in

                if (consumer.isAutoClose())
                {
                    // There is a small window where the message is between the two queues in the dispatcher.
                    if (consumer.isClosed())
                    {
                        if (_logger.isDebugEnabled())
                        {
                            _logger.debug("Closing consumer:" + consumer.debugIdentity());
                        }

                        deregisterConsumer(consumer);
                    }
                    else
                    {
                        _queue.add(new CloseConsumerMessage(consumer));
                    }
                }
            }
        }
    }

    private void rejectPending(C consumer)
    {
        // Reject messages on pre-receive queue
        consumer.releasePendingMessages();

        // Reject messages on pre-dispatch queue
        rejectMessagesForConsumerTag(consumer.getConsumerTag());

        // closeConsumer
        consumer.markClosed();
    }

    public QueueBrowser createBrowser(Queue queue) throws JMSException
    {
        if (isStrictAMQP())
        {
            throw new UnsupportedOperationException();
        }

        return createBrowser(queue, null);
    }

    /**
     * Create a queue browser if the destination is a valid queue.
     */
    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
    {
        if (isStrictAMQP())
        {
            throw new UnsupportedOperationException();
        }

        checkNotClosed();
        checkValidQueue(queue);

        return new AMQQueueBrowser(this, queue, messageSelector);
    }

    protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
            throws JMSException
    {
        checkValidDestination(destination);

        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false,
                                  messageSelector, null, true, true);
    }

    public MessageConsumer createConsumer(Destination destination) throws JMSException
    {
        checkValidDestination(destination);

        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
                                  isBrowseOnlyDestination(destination), false);
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
    {
        checkValidDestination(destination);

        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
            throws JMSException
    {
        checkValidDestination(destination);

        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
    }

    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
                                          String selector) throws JMSException
    {
        checkValidDestination(destination);

        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
    }

    public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
                                              boolean exclusive, String selector) throws JMSException
    {
        checkValidDestination(destination);

        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
    }

    public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
                                          boolean exclusive, String selector, Map<String,Object> rawSelector) throws JMSException
    {
        checkValidDestination(destination);

        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
                                  false);
    }

    public  TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
    {
        // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method
        return createDurableSubscriber(topic, name, null, false);
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal)
            throws JMSException
    {
        checkNotClosed();
        Topic origTopic = checkValidTopic(topic, true);

        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
        if (dest.getDestSyntax() == DestSyntax.ADDR && !isResolved(dest))
        {
            try
            {
                resolveAddress(dest,false,noLocal);
                if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)
                {
                    throw new JMSException("Durable subscribers can only be created for Topics");
                }
            }
            catch(QpidException e)
            {
                throw toJMSException("Error when verifying destination",e);
            }
            catch(TransportException e)
            {
                throw toJMSException("Error when verifying destination", e);
            }
        }

        String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;

        _subscriberDetails.lock();
        try
        {
            TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);

            // Not subscribed to this name in the current session
            if (subscriber == null)
            {
                // After the address is resolved routing key will not be null.
                String topicName = dest.getRoutingKey();

                if (_strictAMQP)
                {
                    if (_strictAMQPFATAL)
                    {
                        throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
                    }
                    else
                    {
                        _logger.warn("Unable to determine if subscription already exists for '" + topicName
                                        + "' for creation durableSubscriber. Requesting queue deletion regardless.");
                    }

                    deleteQueue(dest.getAMQQueueName());
                }
                else
                {
                    Map<String,Object> args = new HashMap<String,Object>();

                    // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
                    // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
                    // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
                    // argument, as specifying null for the arguments when querying means they should not be checked at all
                    args.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
                    if(noLocal)
                    {
                        args.put(AMQPFilterTypes.NO_LOCAL.getValue(), true);
                    }

                    // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
                    // says we must trash the subscription.
                    boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
                    boolean isQueueBoundForTopicAndSelector =
                                isQueueBound(dest.getExchangeName(),
                                             dest.getAMQQueueName(),
                                             topicName, args);

                    if (isQueueBound && !isQueueBoundForTopicAndSelector)
                    {
                        deleteQueue(dest.getAMQQueueName());
                    }
                    else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match
                    {
                        try
                        {
                            bindQueue(dest.getAMQQueueName(),
                                      dest.getRoutingKey(),
                                      args,
                                      dest.getExchangeName(),
                                      dest,
                                      true);
                        }
                        catch(QpidException e)
                        {
                            throw toJMSException("Error when checking binding",e);
                        }
                    }
                }
            }
            else
            {
                // Subscribed with the same topic and no current / previous or same selector
                if (subscriber.getTopic().equals(topic)
                    && ((messageSelector == null && subscriber.getMessageSelector() == null)
                            || (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector()))))
                {
                    throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name
                            + (messageSelector != null ? " and selector " + messageSelector : ""));
                }
                else
                {
                    unsubscribe(name, true);
                }

            }

            _subscriberAccess.lock();
            try
            {
                C consumer = (C) createConsumer(dest, messageSelector, noLocal);
                consumer.markAsDurableSubscriber();
                subscriber = new TopicSubscriberAdaptor<C>(dest, consumer);

                // Save subscription information
                _subscriptions.put(name, subscriber);
                _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
            }
            finally
            {
                _subscriberAccess.unlock();
            }

            return subscriber;
        }
        catch (TransportException e)
        {
            throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
        }
        finally
        {
            _subscriberDetails.unlock();
        }
    }

    public ListMessage createListMessage() throws JMSException
    {
        checkNotClosed();
        AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory());
        msg.setAMQSession(this);
        return msg;
    }

    public MapMessage createMapMessage() throws JMSException
    {
        checkNotClosed();
        if (_useAMQPEncodedMapMessage)
        {
            AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory());
            msg.setAMQSession(this);
            return msg;
        }
        else
        {
            JMSMapMessage msg = new JMSMapMessage(getMessageDelegateFactory());
            msg.setAMQSession(this);
            return msg;
        }
    }

    public javax.jms.Message createMessage() throws JMSException
    {
        return createBytesMessage();
    }

    public ObjectMessage createObjectMessage() throws JMSException
    {
        checkNotClosed();
         JMSObjectMessage msg = new JMSObjectMessage(getAMQConnection(), getMessageDelegateFactory());
         msg.setAMQSession(this);
         return msg;
    }

    public ObjectMessage createObjectMessage(Serializable object) throws JMSException
    {
        ObjectMessage msg = createObjectMessage();
        msg.setObject(object);

        return msg;
    }

    public P createProducer(Destination destination) throws JMSException
    {
        return createProducerImpl(destination, null, null);
    }

    public P createProducer(Destination destination, boolean immediate) throws JMSException
    {
        return createProducerImpl(destination, null, immediate);
    }

    public P createProducer(Destination destination, boolean mandatory, boolean immediate)
            throws JMSException
    {
        return createProducerImpl(destination, mandatory, immediate);
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException
    {
        checkNotClosed();

        return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic);
    }

    public Queue createQueue(String queueName) throws JMSException
    {
        checkNotClosed();
        try
        {
            if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1)
            {
                DestSyntax syntax = AMQDestination.getDestType(queueName);
                if (syntax == AMQDestination.DestSyntax.BURL)
                {
                    // For testing we may want to use the prefix
                    return new AMQQueue(getDefaultQueueExchangeName(),
                                        AMQDestination.stripSyntaxPrefix(queueName));
                }
                else
                {
                    AMQQueue queue = new AMQQueue(queueName);
                    return queue;

                }
            }
            else
            {
                return new AMQQueue(queueName);
            }
        }
        catch (URISyntaxException urlse)
        {
            _logger.error("", urlse);
            throw JMSExceptionHelper.chainJMSException(new JMSException(urlse.getReason()), urlse);
        }

    }

    /**
     * Declares the named queue.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     * @param name       The name of the queue to declare.
     * @param autoDelete
     * @param durable    Flag to indicate that the queue is durable.
     * @param exclusive  Flag to indicate that the queue is exclusive to this client.
     *
     * @throws QpidException If the queue cannot be declared for any reason.
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    public void createQueue(final String name, final boolean autoDelete, final boolean durable,
                            final boolean exclusive) throws QpidException
    {
        createQueue(name, autoDelete, durable, exclusive, null);
    }

    /**
     * Declares the named queue.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     * @param name       The name of the queue to declare.
     * @param autoDelete
     * @param durable    Flag to indicate that the queue is durable.
     * @param exclusive  Flag to indicate that the queue is exclusive to this client.
     * @param arguments  Arguments used to set special properties of the queue
     *
     * @throws QpidException If the queue cannot be declared for any reason.
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    public void createQueue(final String name, final boolean autoDelete, final boolean durable,
                            final boolean exclusive, final Map<String, Object> arguments) throws QpidException
    {
        new FailoverRetrySupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>()
        {
            public Object execute() throws QpidException, FailoverException
            {
                sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
                return null;
            }
        }, _connection).execute();
    }

    public abstract void sendCreateQueue(String name, final boolean autoDelete, final boolean durable,
                                         final boolean exclusive, final Map<String, Object> arguments) throws
                                                                                                       QpidException, FailoverException;

    /**
     * Creates a QueueReceiver
     *
     * @param destination
     *
     * @return QueueReceiver - a wrapper around our MessageConsumer
     *
     * @throws JMSException
     */
    public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
    {
        checkValidDestination(destination);
        Queue dest = validateQueue(destination);
        C consumer = (C) createConsumer(dest);
        consumer.setAddressType(AMQDestination.QUEUE_TYPE);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    /**
     * Creates a QueueReceiver using a message selector
     *
     * @param destination
     * @param messageSelector
     *
     * @return QueueReceiver - a wrapper around our MessageConsumer
     *
     * @throws JMSException
     */
    public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
    {
        checkValidDestination(destination);
        Queue dest = validateQueue(destination);
        C consumer = (C) createConsumer(dest, messageSelector);
        consumer.setAddressType(AMQDestination.QUEUE_TYPE);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    /**
     * Creates a QueueReceiver wrapping a MessageConsumer
     *
     * @param queue
     *
     * @return QueueReceiver
     *
     * @throws JMSException
     */
    public QueueReceiver createReceiver(Queue queue) throws JMSException
    {
        checkNotClosed();
        Queue dest = validateQueue(queue);
        C consumer = (C) createConsumer(dest);
        consumer.setAddressType(AMQDestination.QUEUE_TYPE);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    /**
     * Creates a QueueReceiver wrapping a MessageConsumer using a message selector
     *
     * @param queue
     * @param messageSelector
     *
     * @return QueueReceiver
     *
     * @throws JMSException
     */
    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
    {
        checkNotClosed();
        Queue dest = validateQueue(queue);
        C consumer = (C) createConsumer(dest, messageSelector);
        consumer.setAddressType(AMQDestination.QUEUE_TYPE);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    private Queue validateQueue(Destination dest) throws InvalidDestinationException
    {
        if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
        {
            return (Queue)dest;
        }
        else
        {
            throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue");
        }
    }

    public QueueSender createSender(Queue queue) throws JMSException
    {
        checkNotClosed();

        return new QueueSenderAdapter(createProducer(queue), queue);
    }

    public StreamMessage createStreamMessage() throws JMSException
    {
        checkNotClosed();
        if (_useAMQPEncodedStreamMessage)
        {
            AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory());
            msg.setAMQSession(this);
            return msg;
        }
        else
        {
            JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory());
            msg.setAMQSession(this);
            return msg;
        }
    }

    /**
     * Creates a non-durable subscriber
     *
     * @param topic
     *
     * @return TopicSubscriber - a wrapper round our MessageConsumer
     *
     * @throws JMSException
     */
    public TopicSubscriber createSubscriber(Topic topic) throws JMSException
    {
        checkNotClosed();
        checkValidTopic(topic);

        return new TopicSubscriberAdaptor<C>(topic,
                createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
    }

    /**
     * Creates a non-durable subscriber with a message selector
     *
     * @param topic
     * @param messageSelector
     * @param noLocal
     *
     * @return TopicSubscriber - a wrapper round our MessageConsumer
     *
     * @throws JMSException
     */
    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
    {
        checkNotClosed();
        checkValidTopic(topic);

        return new TopicSubscriberAdaptor<C>(topic,
                                             createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
                                                                true, messageSelector, null, false, false));
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException
    {
        checkNotClosed();
        try
        {
            AMQTemporaryQueue result = new AMQTemporaryQueue(this);

            // this is done so that we can produce to a temporary queue before we create a consumer
            result.setQueueName(result.getRoutingKey());
            Map<String, Object> args;
            if(_connection.getDelegate().isQueueLifetimePolicySupported())
            {
                args = new HashMap<>();
                args.put("qpid.lifetime_policy", "DELETE_ON_CONNECTION_CLOSE");
                args.put("qpid.exclusivity_policy", "CONNECTION");
            }
            else
            {
                args = null;
            }
            createQueue(result.getAMQQueueName(), result.isAutoDelete(),
                        result.isDurable(), result.isExclusive(),
                        args);
            bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
                    new HashMap<String, Object>(), result.getExchangeName(), result);
            return result;
        }
        catch (QpidException e)
        {
           throw toJMSException("Cannot create temporary queue",e);
        }
        catch(TransportException e)
        {
            throw toJMSException("Cannot create temporary queue: " + e.getMessage(), e);
        }
        catch(Exception e)
        {
            throw JMSExceptionHelper.chainJMSException(new JMSException("Cannot create temporary queue: "
                                                                        + e.getMessage()), e);
        }
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException
    {
        checkNotClosed();

        return new AMQTemporaryTopic(this);
    }

    public TextMessage createTextMessage() throws JMSException
    {
        checkNotClosed();

        JMSTextMessage msg = new JMSTextMessage(getMessageDelegateFactory());
        msg.setAMQSession(this);
        return msg;
    }

    protected Object getFailoverMutex()
    {
        return _connection.getFailoverMutex();
    }

    public TextMessage createTextMessage(String text) throws JMSException
    {

        TextMessage msg = createTextMessage();
        msg.setText(text);

        return msg;
    }

    public Topic createTopic(String topicName) throws JMSException
    {
        checkNotClosed();
        try
        {
            if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)
            {
                DestSyntax syntax = AMQDestination.getDestType(topicName);
                // for testing we may want to use the prefix to indicate our choice.
                topicName = AMQDestination.stripSyntaxPrefix(topicName);
                if (syntax == AMQDestination.DestSyntax.BURL)
                {
                    return new AMQTopic(getDefaultTopicExchangeName(), topicName);
                }
                else
                {
                    return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName);
                }
            }
            else
            {
                return new AMQTopic(topicName);
            }

        }
        catch (URISyntaxException urlse)
        {
            _logger.error("", urlse);
            throw JMSExceptionHelper.chainJMSException(new JMSException(urlse.getReason()), urlse);
        }
    }

    public void declareExchange(String name, String type, boolean nowait) throws QpidException
    {
        declareExchange(name, type, nowait, false, false, false);
    }

    @Override
    public void deleteExchange(final String exchangeName) throws JMSException
    {
        try
        {
            new FailoverRetrySupport<>(new FailoverProtectedOperation<Object, QpidException>()
            {
                public Object execute() throws QpidException, FailoverException
                {
                    sendExchangeDelete(exchangeName, false);
                    return null;
                }
            }, _connection).execute();
        }
        catch (QpidException e)
        {
            throw toJMSException("The exchange deletion failed: " + e.getMessage(), e);
        }
    }

    abstract void sendExchangeDelete(final String name, final boolean nowait) throws QpidException, FailoverException;

    abstract public void sync() throws QpidException;

    public int getAcknowledgeMode()
    {
        return _acknowledgeMode;
    }

    public AMQConnection getAMQConnection()
    {
        return _connection;
    }

    public int getChannelId()
    {
        return _channelId;
    }

    public int getDefaultPrefetch()
    {
        return _prefetchHighMark;
    }

    public int getDefaultPrefetchHigh()
    {
        return _prefetchHighMark;
    }

    public int getDefaultPrefetchLow()
    {
        return _prefetchLowMark;
    }

    public int getPrefetch()
    {
        return _prefetchHighMark;
    }

    public String getDefaultQueueExchangeName()
    {
        return _connection.getDefaultQueueExchangeName();
    }

    public String getDefaultTopicExchangeName()
    {
        return _connection.getDefaultTopicExchangeName();
    }

    public MessageListener getMessageListener() throws JMSException
    {
        return _messageListener;
    }

    public String getTemporaryQueueExchangeName()
    {
        return _connection.getTemporaryQueueExchangeName();
    }

    public String getTemporaryTopicExchangeName()
    {
        return _connection.getTemporaryTopicExchangeName();
    }

    public int getTicket()
    {
        return _ticket;
    }

    /**
     * Indicates whether the session is in transacted mode.
     *
     * @return true if the session is in transacted mode
     * @throws IllegalStateException - if session is closed.
     */
    public boolean getTransacted() throws JMSException
    {
        // Sun TCK checks that javax.jms.IllegalStateException is thrown for closed session
        // nowhere else this behavior is documented
        checkNotClosed();
        return _transacted;
    }

    /**
     * Indicates whether the session is in transacted mode.
     */
    public boolean isTransacted()
    {
        return _transacted;
    }

    public boolean hasConsumer(Destination destination)
    {
        AtomicInteger counter = _destinationConsumerCount.get(destination);

        return (counter != null) && (counter.get() != 0);
    }

    /** Indicates that warnings should be generated on violations of the strict AMQP. */
    public boolean isStrictAMQP()
    {
        return _strictAMQP;
    }

    public boolean isSuspended()
    {
        return _suspended;
    }

    protected void addUnacknowledgedMessage(long id)
    {
        _unacknowledgedMessageTags.add(id);
    }

    protected void addDeliveredMessage(long id)
    {
        _deliveredMessageTags.add(id);
    }

    /**
     * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
     * the queue read by the dispatcher.
     *
     * @param message the message that has been received
     */
    public void messageReceived(UnprocessedMessage message)
    {
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Message[" + message.toString() + "] received in session");
        }
        _highestDeliveryTag.set(message.getDeliveryTag());
        _queue.add(message);
    }

    public void declareAndBind(AMQDestination amqd)
            throws
            QpidException
    {
        declareAndBind(amqd, new HashMap<String,Object>());
    }


    public void declareAndBind(AMQDestination amqd, Map<String,Object> arguments)
            throws
            QpidException
    {
        declareExchange(amqd, false);
        String queueName = declareQueue(amqd, false);
        bindQueue(queueName, amqd.getRoutingKey(), arguments, amqd.getExchangeName(), amqd);
    }

    /**
     * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
     * <p>
     * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges
     * all messages that have been delivered to the client.
     * <p>
     * Restarting a session causes it to take the following actions:
     *
     * <ul>
     * <li>Stop message delivery.</li>
     * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
     * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
     * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
     * </ul>
     *
     * <p>
     * If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
     * receiving acknowledgment that it has then a JMSException will be thrown. In this case it will not be possible
     * for the client to determine whether the broker is going to recover the session or not.
     *
     * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
     *                      Not that this does not necessarily mean that the recovery has failed, but simply that it is
     *                      not possible to tell if it has or not.
     * TODO  Be aware of possible changes to parameter order as versions change.
     *
     * Strategy for handling recover.
     * Flush any acks not yet sent.
     * Stop the message flow.
     * Clear the dispatch queue and the consumer queues.
     * Release/Reject all messages received but not yet acknowledged.
     * Start the message flow.
     */
    public void recover() throws JMSException
    {
        // Ensure that the session is open.
        checkNotClosed();

        // Ensure that the session is not transacted.
        checkNotTransacted();


        try
        {
            // flush any acks we are holding in the buffer.
            flushAcknowledgments();

            // this is only set true here, and only set false when the consumers preDeliver method is called
            _sessionInRecovery = true;

            boolean isSuspended = isSuspended();

            if (!isSuspended)
            {
                suspendChannel(true);
            }

            // Set to true to short circuit delivery of anything currently
            //in the pre-dispatch queue.
            _usingDispatcherForCleanup = true;

            syncDispatchQueue(false);

            // Set to false before sending the recover as 0-8/9/9-1 will
            //send messages back before the recover completes, and we
            //probably shouldn't clean those! ;-)
            _usingDispatcherForCleanup = false;

            if (_dispatcher != null)
            {
                _dispatcher.recover();
            }

            sendRecover();

            markClean();

            if (!isSuspended)
            {
                suspendChannel(false);
            }
        }
        catch (QpidException e)
        {
            throw toJMSException("Recover failed: " + e.getMessage(), e);
        }
        catch (FailoverException e)
        {
            throw JMSExceptionHelper.chainJMSException(new JMSException(
                    "Recovery was interrupted by fail-over. Recovery status is not known."), e);
        }
        catch(TransportException e)
        {
            throw toJMSException("Recover failed: " + e.getMessage(), e);
        }
    }

    protected abstract void sendRecover() throws QpidException, FailoverException;

    protected abstract void flushAcknowledgments();

    public void rejectMessage(UnprocessedMessage message, boolean requeue)
    {

        if (_logger.isDebugEnabled())
        {
            _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag());
        }

        rejectMessage(message.getDeliveryTag(), requeue);
    }

    public void rejectMessage(AbstractJMSMessage message, boolean requeue)
    {
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
        }

        rejectMessage(message.getDeliveryTag(), requeue);

    }

    public abstract void rejectMessage(long deliveryTag, boolean requeue);

    /**
     * Commits all messages done in this transaction and releases any locks currently held.
     * <p>
     * If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the
     * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
     * The client will be unable to determine whether or not the rollback actually happened on the broker in this case.
     *
     * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
     *                      not mean that the rollback is known to have failed, merely that it is not known whether it
     *                      failed or not.
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    public void rollback() throws JMSException
    {
        synchronized (_suspensionLock)
        {
            checkTransacted();

            try
            {
                boolean isSuspended = isSuspended();

                if (!isSuspended)
                {
                    suspendChannel(true);
                }

                setRollbackMark();

                syncDispatchQueue(false);

                _dispatcher.rollback();

                releaseForRollback();

                sendRollback();

                markClean();

                if (!isSuspended)
                {
                    suspendChannel(false);
                }
            }
            catch (QpidException e)
            {
                throw toJMSException("Failed to rollback: " + e, e);
            }
            catch (FailoverException e)
            {
                throw JMSExceptionHelper.chainJMSException(new JMSException(
                        "Fail-over interrupted rollback. Status of the rollback is uncertain."), e);
            }
            catch (TransportException e)
            {
                throw toJMSException("Failure to rollback:" + e.getMessage(), e);
            }
        }
    }

    public abstract void releaseForRollback();

    public abstract void sendRollback() throws QpidException, FailoverException;

    public void run()
    {
        throw new java.lang.UnsupportedOperationException();
    }

    public void setMessageListener(MessageListener listener) throws JMSException
    {
    }

    /**
     * @see #unsubscribe(String, boolean)
     */
    public void unsubscribe(String name) throws JMSException
    {
        try
        {
            unsubscribe(name, false);
        }
        catch (TransportException e)
        {
            throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
        }
    }

    /**
     * Unsubscribe from a subscription.
     *
     * @param name the name of the subscription to unsubscribe
     * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the
     * queue is not bound, possibly due to the subscription being closed.
     * @throws JMSException on
     * @throws InvalidDestinationException
     */
    private void unsubscribe(String name, boolean safe) throws JMSException
    {
        TopicSubscriberAdaptor<C> subscriber;

        _subscriberDetails.lock();
        try
        {
            checkNotClosed();
            subscriber = _subscriptions.get(name);
            if (subscriber != null)
            {
                // Remove saved subscription information
                _subscriptions.remove(name);
                _reverseSubscriptionMap.remove(subscriber.getMessageConsumer());
            }
        }
        finally
        {
            _subscriberDetails.unlock();
        }

        if (subscriber != null)
        {
            subscriber.close();

            // send a queue.delete for the subscription
            deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
        }
        else
        {
            if (_strictAMQP)
            {
                if (_strictAMQPFATAL)
                {
                    throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
                }
                else
                {
                    _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
                                 + " Requesting queue deletion regardless.");
                }

                deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
            }
            else // Queue Browser
            {

                if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
                {
                    deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
                }
                else if (!safe)
                {
                    throw new InvalidDestinationException("Unknown subscription name: " + name);
                }
            }
        }
    }

    protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
                                   final int prefetchLow, final boolean noLocal,
                                   final boolean exclusive, String selector, final Map<String,Object> rawSelector,
                                   final boolean noConsume, final boolean autoClose) throws JMSException
    {
        checkTemporaryDestination(destination);

        if(!noConsume && isBrowseOnlyDestination(destination))
        {
            throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
                                                  "but a 'browseOnly' Destination has been supplied.");
        }

        final String messageSelector;

        if (_strictAMQP && !((selector == null) || selector.equals("")))
        {
            if (_strictAMQPFATAL)
            {
                throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
            }
            else
            {
                messageSelector = null;
            }
        }
        else
        {
            messageSelector = selector;
        }

        return new FailoverRetrySupport<C, JMSException>(
                new FailoverProtectedOperation<C, JMSException>()
                {
                    public C execute() throws JMSException, FailoverException
                    {
                        checkNotClosed();

                        AMQDestination amqd = (AMQDestination) destination;

                        C consumer;
                        try
                        {
                            consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
                                                             noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
                        }
                        catch(TransportException e)
                        {
                            throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
                        }

                        if (_messageListener != null)
                        {
                            consumer.setMessageListener(_messageListener);
                        }

                        try
                        {
                            registerConsumer(consumer, false);
                        }
                        catch (AMQInvalidArgumentException ise)
                        {
                            throw JMSExceptionHelper.chainJMSException(new InvalidSelectorException(ise.getMessage()),
                                                                       ise);
                        }
                        catch (AMQInvalidRoutingKeyException e)
                        {
                            throw JMSExceptionHelper.chainJMSException(new InvalidDestinationException(
                                    "Invalid routing key:"
                                    + amqd.getRoutingKey()
                            ), e);
                        }
                        catch (QpidException e)
                        {
                            if (e instanceof AMQChannelClosedException)
                            {
                                close(-1, false);
                            }

                            throw toJMSException("Error registering consumer: " + e,e);
                        }
                        catch (TransportException e)
                        {
                            throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
                        }
                        return consumer;
                    }
                }, _connection).execute();
    }

    public abstract C createMessageConsumer(final AMQDestination destination,
                                            final int prefetchHigh,
                                            final int prefetchLow,
                                            final boolean noLocal,
                                            final boolean exclusive, String selector,
                                            final Map<String,Object> arguments,
                                            final boolean noConsume,
                                            final boolean autoClose) throws JMSException;

    /**
     * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
     * instance.
     *
     * @param consumer the consum
     */
    void deregisterConsumer(C consumer)
    {
        if (_consumers.remove(consumer.getConsumerTag()) != null)
        {
            _subscriberAccess.lock();
            try
            {
                String subscriptionName = _reverseSubscriptionMap.remove(consumer);
                if (subscriptionName != null)
                {
                    _subscriptions.remove(subscriptionName);
                }
            }
            finally
            {
                _subscriberAccess.unlock();
            }

            Destination dest = consumer.getDestination();
            synchronized (dest)
            {
                // Provide additional NPE check
                // This would occur if the consumer was closed before it was
                // fully opened.
                if (_destinationConsumerCount.get(dest) != null)
                {
                    if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
                    {
                        _destinationConsumerCount.remove(dest);
                    }
                }
            }
        }
    }

    void deregisterProducer(long producerId)
    {
        _producers.remove(producerId);
    }

    boolean isInRecovery()
    {
        return _sessionInRecovery;
    }

    boolean isQueueBound(String exchangeName, String queueName) throws JMSException
    {
        return isQueueBound(exchangeName, queueName, null);
    }

    /**
     * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     * @param exchangeName The exchange name to test for binding against.
     * @param queueName    The queue name to check if bound.
     * @param routingKey   The routing key to check if the queue is bound under.
     *
     * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
     *
     * @throws JMSException If the query fails for any reason.
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    public abstract boolean isQueueBound(final String exchangeName, final String queueName, final String routingKey)
            throws JMSException;

    public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;

    public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException;

    /**
     * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
     * when the client has vetoed resubscription.
     */
    void markClosed()
    {
        setClosed();
        _connection.deregisterSession(_channelId);
        markClosedProducersAndConsumers();

    }


    void syncDispatchQueue(final boolean holdDispatchLock)
    {
        if (Thread.currentThread() == _dispatcherThread || holdDispatchLock)
        {
            while (!super.isClosed() && !_queue.isEmpty())
            {
                Dispatchable disp;
                try
                {
                    disp = _queue.take();
                }
                catch (InterruptedException e)
                {
                    throw new RuntimeException(e);
                }

                // Check just in case _queue becomes empty, it shouldn't but
                // better than an NPE.
                if (disp == null)
                {
                    _logger.debug("_queue became empty during sync.");
                    break;
                }

                disp.dispatch(AMQSession.this);
            }
        }
        else
        {
            startDispatcherIfNecessary();

            final CountDownLatch signal = new CountDownLatch(1);

            _queue.add(new Dispatchable()
            {
                public void dispatch(AMQSession ssn)
                {
                    signal.countDown();
                }
            });

            try
            {
                signal.await();
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }
    }

    void drainDispatchQueue()
    {
        if (Thread.currentThread() == _dispatcherThread)
        {
            while (!super.isClosed() && !_queue.isEmpty())
            {
                Dispatchable disp;
                try
                {
                    disp = _queue.take();
                }
                catch (InterruptedException e)
                {
                    throw new RuntimeException(e);
                }

                // Check just in case _queue becomes empty, it shouldn't but
                // better than an NPE.
                if (disp == null)
                {
                    _logger.debug("_queue became empty during sync.");
                    break;
                }

                disp.dispatch(AMQSession.this);
            }
        }
        else
        {
            startDispatcherIfNecessary(false);

            final CountDownLatch signal = new CountDownLatch(1);

            _queue.add(new Dispatchable()
            {
                public void dispatch(AMQSession ssn)
                {
                    signal.countDown();
                }
            });

            try
            {
                signal.await();
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * Resubscribes all producers and consumers. This is called when performing failover.
     *
     * @throws QpidException
     */
    void resubscribe() throws QpidException
    {
        if (_dirty)
        {
            _failedOverDirty = true;
        }

        // Also reset the delivery tag tracker, to insure we dont
        // return the first <total number of msgs received on session>
        // messages sent by the brokers following the first rollback
        // after failover
        resetRollbackMarkers();

        _unacknowledgedMessageTags.clear();
        _prefetchedMessageTags.clear();

        clearResolvedDestinations();
        resubscribeProducers();
        resubscribeConsumers();
    }

    void resetRollbackMarkers()
    {
        _highestDeliveryTag.set(-1);
        _rollbackMark.set(-1);
    }

    void setHasMessageListeners()
    {
        _hasMessageListeners = true;
    }

    void setInRecovery(boolean inRecovery)
    {
        _sessionInRecovery = inRecovery;
    }

    boolean isStarted()
    {
        return _startedAtLeastOnce.get();
    }

    /**
     * Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
     *
     * @throws QpidException If the session cannot be started for any reason.
     * TODO  This should be controlled by _stopped as it pairs with the stop method fixme or check the
     * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
     * for each subsequent call to flow.. only need to do this if we have called stop.
     */
    void start() throws QpidException
    {
        // Check if the session has previously been started and suspended, in which case it must be unsuspended.
        if (_startedAtLeastOnce.getAndSet(true))
        {
            suspendChannel(false);
        }

        // If the event dispatcher is not running then start it too.
        if (hasMessageListeners())
        {
            startDispatcherIfNecessary();
        }
    }

    void startDispatcherIfNecessary()
    {
        //If we are the dispatcher then we don't need to check we are started
        if (Thread.currentThread() == _dispatcherThread)
        {
            return;
        }

        // If IMMEDIATE_PREFETCH is not set then we need to start fetching
        // This is final per session so will be multi-thread safe.
        if (!_immediatePrefetch)
        {
            // We do this now if this is the first call on a started connection
            if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
            {
                try
                {
                    suspendChannel(false);
                }
                catch (QpidException e)
                {
                    _logger.info("Unsuspending channel threw an exception:", e);
                }
            }
        }

        startDispatcherIfNecessary(false);
    }

    synchronized void startDispatcherIfNecessary(boolean initiallyStopped)
    {
        if (_dispatcher == null)
        {
            _dispatcher = new Dispatcher();
            try
            {
                _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);

            }
            catch(Exception e)
            {
                throw new Error("Error creating Dispatcher thread",e);
            }

            String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber();

            _dispatcherThread.setName(dispatcherThreadName);
            _dispatcherThread.setDaemon(DAEMON_DISPATCHER_THREAD);
            _dispatcher.setConnectionStopped(initiallyStopped);
            _dispatcherThread.start();
            if (_dispatcherLogger.isDebugEnabled())
            {
                _dispatcherLogger.debug(_dispatcherThread.getName() + " created");
            }
        }
        else
        {
            _dispatcher.setConnectionStopped(initiallyStopped);
        }
    }

    void stop() throws QpidException
    {
        // Stop the server delivering messages to this session.
        suspendChannelIfNotClosing();
        stopExistingDispatcher();
    }

    private void checkNotTransacted() throws JMSException
    {
        if (getTransacted())
        {
            throw new IllegalStateException("Session is transacted");
        }
    }

    private void checkTemporaryDestination(Destination destination) throws JMSException
    {
        if ((destination instanceof TemporaryDestination))
        {
            _logger.debug("destination is temporary");
            final TemporaryDestination tempDest = (TemporaryDestination) destination;
            if (tempDest.getSession().getAMQConnection() != this.getAMQConnection())
            {
                _logger.debug("destination is on different conection");
                throw new JMSException("Cannot consume from a temporary destination created on another connection");
            }

            if (tempDest.isDeleted())
            {
                _logger.debug("destination is deleted");
                throw new JMSException("Cannot consume from a deleted destination");
            }
        }
    }

    protected void checkTransacted() throws JMSException
    {
        if (!getTransacted())
        {
            throw new IllegalStateException("Session is not transacted");
        }
    }

    private void checkValidDestination(Destination destination) throws InvalidDestinationException
    {
        if (destination == null)
        {
            throw new javax.jms.InvalidDestinationException("Invalid Queue");
        }
    }

    private void checkValidQueue(Queue queue) throws InvalidDestinationException
    {
        if (queue == null)
        {
            throw new javax.jms.InvalidDestinationException("Invalid Queue");
        }
    }

    /*
     * I could have combined the last 3 methods, but this way it improves readability
     */
    protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException
    {
        if (topic == null)
        {
            throw new javax.jms.InvalidDestinationException("Invalid Topic");
        }

        if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this))
        {
            throw new javax.jms.InvalidDestinationException(
                    "Cannot create a subscription on a temporary topic created in another session");
        }

        if ((topic instanceof TemporaryDestination) && durable)
        {
            throw new javax.jms.InvalidDestinationException
                ("Cannot create a durable subscription with a temporary topic: " + topic);
        }

        if (!(topic instanceof AMQDestination))
        {
            throw new javax.jms.InvalidDestinationException(
                    "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
                    + topic.getClass().getName());
        }

        return topic;
    }

    protected Topic checkValidTopic(Topic topic) throws JMSException
    {
        return checkValidTopic(topic, false);
    }

    /**
     * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
     *
     * @param error not null if this is a result of an error occurring at the connection level
     */
    private void closeConsumers(Throwable error) throws JMSException
    {
        // we need to clone the list of consumers since the close() method updates the _consumers collection
        // which would result in a concurrent modification exception
        final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());

        final Iterator<C> it = clonedConsumers.iterator();
        while (it.hasNext())
        {
            final C con = it.next();
            if (error != null)
            {
                con.notifyError(error);
            }
            else
            {
                con.close(false);
            }
        }
        // at this point the _consumers map will be empty
        if (_dispatcher != null)
        {
            _dispatcher.close();
            _dispatcher = null;
        }
    }

    /**
     * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
     * currently no way of propagating errors to message producers (this is a JMS limitation).
     */
    private void closeProducers() throws JMSException
    {
        // we need to clone the list of producers since the close() method updates the _producers collection
        // which would result in a concurrent modification exception
        final ArrayList clonedProducers = new ArrayList(_producers.values());

        final Iterator it = clonedProducers.iterator();
        while (it.hasNext())
        {
            final P prod = (P) it.next();
            prod.close();
        }
        // at this point the _producers map is empty
    }

    /**
     * Close all producers or consumers. This is called either in the error case or when closing the session normally.
     *
     * @param amqe the exception, may be null to indicate no error has occurred
     */
    private void closeProducersAndConsumers(QpidException amqe) throws JMSException
    {
        JMSException jmse = null;
        try
        {
            closeProducers();
        }
        catch (JMSException e)
        {
            _logger.error("Error closing session: " + e, e);
            jmse = e;
        }

        try
        {
            closeConsumers(amqe);
        }
        catch (JMSException e)
        {
            _logger.error("Error closing session: " + e, e);
            if (jmse == null)
            {
                jmse = e;
            }
        }

        if (jmse != null)
        {
            throw jmse;
        }
    }

    /**
     * Register to consume from the queue.
     * @param queueName
     */
    private void consumeFromQueue(C consumer, String queueName, boolean nowait) throws QpidException, FailoverException
    {
        Link link = consumer.getDestination().getLink();
        String linkName;
        if(link != null && link.getName() != null && consumer.getDestination().getAddressType() == AMQDestination.QUEUE_TYPE)
        {
            linkName = link.getName();
        }
        else
        {
            linkName = String.valueOf(_nextTag++);
        }

        consumer.setConsumerTag(linkName);
        // we must register the consumer in the map before we actually start listening
        _consumers.put(linkName, consumer);

        synchronized (consumer.getDestination())
        {
            _destinationConsumerCount.putIfAbsent(consumer.getDestination(), new AtomicInteger());
            _destinationConsumerCount.get(consumer.getDestination()).incrementAndGet();
        }


        try
        {
            sendConsume(consumer, queueName, nowait);
        }
        catch (QpidException e)
        {
            // clean-up the map in the event of an error
            _consumers.remove(linkName);
            throw e;
        }
    }

    void handleLinkCreation(AMQDestination dest) throws QpidException
    {
        createBindings(dest, dest.getLink().getBindings());
    }


    void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws QpidException
    {
        String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
                .getAddressName() : "amq.topic";

        String defaultQueueName = null;
        if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
        {
            defaultQueueName = dest.getQueueName();
        }
        else
        {
            defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
        }

        for (AMQDestination.Binding binding: bindings)
        {
            String queue = binding.getQueue() == null?
                    defaultQueueName: binding.getQueue();

            String exchange = binding.getExchange() == null ?
                    defaultExchangeForBinding :
                    binding.getExchange();

            if (_logger.isDebugEnabled())
            {
                _logger.debug("Binding queue : " + queue +
                              " exchange: " + exchange +
                              " using binding key " + binding.getBindingKey() +
                              " with args " + Strings.printMap(binding.getArgs()));
            }
            doBind(dest, binding, queue, exchange);
        }
    }

    protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws QpidException;

    abstract void handleExchangeNodeCreation(AMQDestination dest) throws QpidException;

    abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange)
            throws QpidException;

    public abstract void sendConsume(C consumer, String queueName,
                                     boolean nowait) throws QpidException, FailoverException;

    private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
            throws JMSException
    {
        return new FailoverRetrySupport<P, JMSException>(
                new FailoverProtectedOperation<P, JMSException>()
                {
                    public P execute() throws JMSException, FailoverException
                    {
                        checkNotClosed();
                        long producerId = getNextProducerId();

                        P producer;
                        try
                        {
                            producer = createMessageProducer(destination, mandatory,
                                    immediate, producerId);
                        }
                        catch (TransportException e)
                        {
                            throw toJMSException("Exception while creating producer:" + e.getMessage(), e);
                        }

                        registerProducer(producerId, producer);

                        return producer;
                    }
                }, _connection).execute();
    }

    public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
                                            final Boolean immediate, final long producerId) throws JMSException;

    private void declareExchange(AMQDestination amqd, boolean nowait) throws QpidException
    {
        declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(),
                        amqd.isExchangeAutoDelete(), amqd.isExchangeInternal());
    }

    /**
     * Returns the number of messages currently queued for the given destination.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     * @param amqd The destination to be checked
     *
     * @return the number of queued messages.
     *
     * @throws QpidException If the queue cannot be declared for any reason.
     */
    public long getQueueDepth(final AMQDestination amqd)
            throws QpidException
    {
        return getQueueDepth(amqd, false);
    }

    /**
     * Returns the number of messages currently queued by the given
     * destination. Syncs session before receiving the queue depth if sync is
     * set to true.
     *
     * @param amqd AMQ destination to get the depth value
     * @param sync flag to sync session before receiving the queue depth
     * @return queue depth
     * @throws QpidException
     */
    public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws QpidException
    {
        return new FailoverNoopSupport<Long, QpidException>(new FailoverProtectedOperation<Long, QpidException>()
        {
            public Long execute() throws QpidException, FailoverException
            {
                try
                {
                    return requestQueueDepth(amqd, sync);
                }
                catch (TransportException e)
                {
                    throw new AMQException(getErrorCode(e), e.getMessage(), e);
                }
            }
        }, _connection).execute();
    }

    protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws QpidException, FailoverException;

    /**
     * Declares the named exchange and type of exchange.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     * @param name            The name of the exchange to declare.
     * @param type            The type of the exchange to declare.
     * @param nowait
     * @param durable
     * @param autoDelete
     * @param internal
     * @throws QpidException If the exchange cannot be declared for any reason.
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    void declareExchange(final String name, final String type,
                         final boolean nowait, final boolean durable,
                         final boolean autoDelete, final boolean internal) throws QpidException
    {
        new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>()
        {
            public Object execute() throws QpidException, FailoverException
            {
                sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal);
                return null;
            }
        }, _connection).execute();
    }

    void declareExchange(final String name, final String type,
                         final boolean nowait, final boolean durable,
                         final boolean autoDelete, final Map<String,Object> arguments,
                         final boolean passive) throws QpidException
    {
        new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>()
        {
            public Object execute() throws QpidException, FailoverException
            {
                sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive);
                return null;
            }
        }, _connection).execute();
    }

    protected String preprocessAddressTopic(final C consumer,
                                            String queueName) throws QpidException
    {
        if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
        {
            if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
            {
                String selector =  consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();

                createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
                queueName = consumer.getDestination().getAMQQueueName();
                consumer.setQueuename(queueName);
            }
            handleLinkCreation(consumer.getDestination());
        }
        return queueName;
    }

    abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws
                                                                                                        QpidException;

    public abstract void sendExchangeDeclare(final String name, final String type, final boolean nowait,
                                             boolean durable, boolean autoDelete, boolean internal) throws
                                                                                                    QpidException, FailoverException;


    public abstract void sendExchangeDeclare(final String name,
                                             final String type,
                                             final boolean nowait,
                                             boolean durable,
                                             boolean autoDelete,
                                             Map<String,Object> arguments,
                                             final boolean passive) throws QpidException, FailoverException;

    /**
     * Declares a queue for a JMS destination.
     * <p>
     * Note that for queues but not topics the name is generated in the client rather than the server. This allows
     * the name to be reused on failover if required. In general, the destination indicates whether it wants a name
     * generated or not.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     *
     * @param amqd            The destination to declare as a queue.
     * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
     *         the client.
     *
     *
     *
     * @throws QpidException If the queue cannot be declared for any reason.
     * TODO  Verify the destiation is valid or throw an exception.
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    protected String declareQueue(final AMQDestination amqd,
                                  final boolean noLocal) throws QpidException
    {
        return declareQueue(amqd, noLocal, false);
    }

    protected String declareQueue(final AMQDestination amqd,
                                  final boolean noLocal, final boolean nowait)
                throws QpidException
    {
        return declareQueue(amqd, noLocal, nowait, false);
    }

    protected abstract String declareQueue(final AMQDestination amqd,
                                           final boolean noLocal, final boolean nowait, final boolean passive) throws
                                                                                                               QpidException;

    /**
     * Undeclares the specified queue.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     * @param queueName The name of the queue to delete.
     *
     * @throws JMSException If the queue could not be deleted for any reason.
     */
    @Override
    public void deleteQueue(final String queueName) throws JMSException
    {
        try
        {
            new FailoverRetrySupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>()
            {
                public Object execute() throws QpidException, FailoverException
                {
                    sendQueueDelete(queueName);
                    return null;
                }
            }, _connection).execute();
        }
        catch (QpidException e)
        {
            throw toJMSException("The queue deletion failed: " + e.getMessage(), e);
        }
    }

    /**
     * Undeclares the specified temporary queue/topic.
     * <p>
     * Note that this operation automatically retries in the event of fail-over.
     *
     * @param amqQueue The name of the temporary destination to delete.
     *
     * @throws JMSException If the queue could not be deleted for any reason.
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
    {
        deleteQueue(amqQueue.getAMQQueueName());
    }

    public abstract void sendQueueDelete(final String queueName) throws QpidException, FailoverException;

    private long getNextProducerId()
    {
        return ++_nextProducerId;
    }

    protected boolean hasMessageListeners()
    {
        return _hasMessageListeners;
    }

    private void markClosedConsumers() throws JMSException
    {
        if (_dispatcher != null)
        {
            _dispatcher.close();
            _dispatcher = null;
        }
        // we need to clone the list of consumers since the close() method updates the _consumers collection
        // which would result in a concurrent modification exception
        final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());

        final Iterator<C> it = clonedConsumers.iterator();
        while (it.hasNext())
        {
            final C con = it.next();
            con.markClosed();
        }
        // at this point the _consumers map will be empty
    }

    private void markClosedProducersAndConsumers()
    {
        try
        {
            // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
            closeProducers();
        }
        catch (JMSException e)
        {
            _logger.error("Error closing session: " + e, e);
        }

        try
        {
            markClosedConsumers();
        }
        catch (JMSException e)
        {
            _logger.error("Error closing session: " + e, e);
        }
    }

    /**
     * Callers must hold the failover mutex before calling this method.
     *
     * @param consumer
     *
     * @throws QpidException
     */
    private void registerConsumer(C consumer, boolean nowait) throws QpidException
    {
        AMQDestination amqd = consumer.getDestination();

        if (amqd.getDestSyntax() == DestSyntax.ADDR)
        {
            resolveAddress(amqd,true,consumer.isNoLocal());
        }
        else
        {
            if (_declareExchanges && !amqd.neverDeclare())
            {
                declareExchange(amqd, nowait);
            }

            if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
            {
                declareQueue(amqd, consumer.isNoLocal(), nowait);
            }
            if (_bindQueues && !amqd.neverDeclare() && !amqd.isDefaultExchange())
            {
                if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
                {
                    bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(),
                            amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait);
                }
            }

        }

        String queueName = amqd.getAMQQueueName();

        // store the consumer queue name
        consumer.setQueuename(queueName);

        // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
        if (!_immediatePrefetch)
        {
            // The dispatcher will be null if we have just created this session
            // so suspend the channel before we register our consumer so that we don't
            // start prefetching until a receive/mListener is set.
            if (_dispatcher == null)
            {
                if (!isSuspended())
                {
                    try
                    {
                        suspendChannel(true);
                        _logger.debug(
                                "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
                    }
                    catch (QpidException e)
                    {
                        _logger.info("Suspending channel threw an exception:", e);
                    }
                }
            }
        }
        else
        {
            _logger.debug("Immediately prefetching existing messages to new consumer.");
        }

        try
        {
            consumeFromQueue(consumer, queueName, nowait);
        }
        catch (FailoverException e)
        {
            throw new QpidException("Fail-over exception interrupted basic consume.", e);
        }
    }

    protected abstract boolean isBound(String exchangeName, String amqQueueName, String routingKey)
            throws QpidException;

    private void registerProducer(long producerId, MessageProducer producer)
    {
        _producers.put(producerId, producer);
    }

    private void rejectMessagesForConsumerTag(String consumerTag)
    {
        Iterator<Dispatchable> messages = _queue.iterator();
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ")");

            if (messages.hasNext())
            {
                _logger.debug("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
            }
            else
            {
                _logger.debug("No messages in _queue to reject");
            }
        }
        while (messages.hasNext())
        {
            UnprocessedMessage message = (UnprocessedMessage) messages.next();

            if (message.getConsumerTag().equals(consumerTag))
            {

                if (_queue.remove(message))
                {
                    if (_logger.isDebugEnabled())
                    {
                        _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
                                      + message.getDeliveryTag());
                    }

                    rejectMessage(message, true);

                    if (_logger.isDebugEnabled())
                    {
                        _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag);
                    }
                }
            }
        }
    }

    private void resubscribeConsumers() throws QpidException
    {
        ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
        _consumers.clear();

        for (C consumer : consumers)
        {
            consumer.failedOverPre();
            registerConsumer(consumer, true);
            consumer.failedOverPost();
        }
    }

    private void resubscribeProducers() throws QpidException
    {
        ArrayList producers = new ArrayList(_producers.values());
        _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}",
                                           producers,
                                           producers.size())); // FIXME: removeKey
        for (Iterator it = producers.iterator(); it.hasNext();)
        {
            P producer = (P) it.next();
            producer.resubscribe();
        }
    }

    /**
     * Suspends or unsuspends this session.
     *
     * @param suspend true indicates that the session should be suspended, false indicates that it
     *                should be unsuspended.
     *
     * @throws QpidException If the session cannot be suspended for any reason.
     * TODO  Be aware of possible changes to parameter order as versions change.
     */
    protected void suspendChannel(boolean suspend) throws QpidException
    {
        synchronized (_suspensionLock)
        {
            try
            {
                if (_logger.isDebugEnabled())
                {
                    _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
                }

                _suspended = suspend;
                sendSuspendChannel(suspend);
            }
            catch (FailoverException e)
            {
                throw new QpidException("Fail-over interrupted suspend/unsuspend channel.", e);
            }
            catch (TransportException e)
            {
                throw new AMQException(getErrorCode(e), e.getMessage(), e);
            }
        }
    }

    public abstract void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException;

    boolean tryLockMessageDelivery()
    {
        try
        {
            // Use timeout of zero to respect fairness. See ReentrantLock#tryLock JavaDocs for details.
            return _messageDeliveryLock.tryLock(0, TimeUnit.SECONDS);
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    void lockMessageDelivery()
    {
        _messageDeliveryLock.lock();
    }

    void unlockMessageDelivery()
    {
        _messageDeliveryLock.unlock();
    }

    /**
     * Indicates whether this session consumers pre-fetche messages
     *
     * @return true if this session consumers pre-fetche messages false otherwise
     */
    public boolean prefetch()
    {
        return _prefetchHighMark > 0;
    }

    /** Signifies that the session has pending sends to commit. */
    public void markDirty()
    {
        _dirty = true;
    }

    /** Signifies that the session has no pending sends to commit. */
    public void markClean()
    {
        _dirty = false;
        _failedOverDirty = false;
    }

    /**
     * Check to see if failover has occured since the last call to markClean(commit or rollback).
     *
     * @return boolean true if failover has occured.
     */
    public boolean hasFailedOverDirty()
    {
        return _failedOverDirty;
    }

    public void setTicket(int ticket)
    {
        _ticket = ticket;
    }

    /**
     * Tests whether flow to this session is blocked.
     *
     * @return true if flow is blocked or false otherwise.
     */
    public abstract boolean isFlowBlocked();

    public abstract void setFlowControl(final boolean active);

    Object getDispatcherLock()
    {
        Dispatcher dispatcher = _dispatcher;
        return dispatcher == null ? null : dispatcher._lock;
    }

    public String createTemporaryQueueName()
    {
        String prefix = _connection.getTemporaryQueuePrefix();
        assert(prefix.isEmpty() || prefix.endsWith("/"));
        return prefix + "TempQueue" + UUID.randomUUID();
    }

    public interface Dispatchable
    {
        void dispatch(AMQSession ssn);
    }

    public void dispatch(UnprocessedMessage message)
    {
        if (_dispatcher == null)
        {
            throw new java.lang.IllegalStateException("dispatcher is not started");
        }

        _dispatcher.dispatchMessage(message);
    }

    /** Used for debugging in the dispatcher. */
    private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");

    /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
    class Dispatcher implements Runnable
    {

        /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
        private final AtomicBoolean _closed = new AtomicBoolean(false);
        private final CountDownLatch _closeCompleted = new CountDownLatch(1);

        private final Object _lock = new Object();
        private final String dispatcherID = "" + System.identityHashCode(this);

        public Dispatcher()
        {
        }

        public void close()
        {
            _closed.set(true);
            _queue.close();
            _dispatcherThread.interrupt();

            // If we are not the dispatcherThread we need to await the exiting of the Dispatcher#run(). See QPID-6672.
            if (Thread.currentThread() != _dispatcherThread)
            {
                try
                {
                    if(!_closeCompleted.await(_dispatcherShutdownTimeoutMs, TimeUnit.MILLISECONDS))
                    {
                        throw new RuntimeException("Dispatcher did not close down within the timeout of " + _dispatcherShutdownTimeoutMs + " ms.");
                    }
                }
                catch (InterruptedException e)
                {
                    Thread.currentThread().interrupt();
                }
            }
        }

        private AtomicBoolean getClosed()
        {
            return _closed;
        }


        public void rollback()
        {

            synchronized (_lock)
            {
                boolean isStopped = connectionStopped();

                if (!isStopped)
                {
                    setConnectionStopped(true);
                }

                setRollbackMark();

                _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");

                for (C consumer : _consumers.values())
                {
                    if (!consumer.isBrowseOnly())
                    {
                        consumer.releasePendingMessages();
                    }
                    else
                    {
                        // should perhaps clear the _SQ here.
                        consumer.clearReceiveQueue();
                    }

                }

                setConnectionStopped(isStopped);
            }

        }

        public void recover()
        {

            synchronized (_lock)
            {
                boolean isStopped = connectionStopped();

                if (!isStopped)
                {
                    setConnectionStopped(true);
                }

                _dispatcherLogger.debug("Session clearing the consumer queues");

                for (C consumer : _consumers.values())
                {
                    List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
                    _prefetchedMessageTags.addAll(tags);
                }

                setConnectionStopped(isStopped);
            }

        }


        public void run()
        {
            try
            {
                if (_dispatcherLogger.isDebugEnabled())
                {
                    _dispatcherLogger.debug(_dispatcherThread.getName() + " started");
                }

                // Allow dispatcher to start stopped
                synchronized (_lock)
                {
                    while (!_closed.get() && connectionStopped())
                    {
                        try
                        {
                            _lock.wait();
                        }
                        catch (InterruptedException e)
                        {
                            Thread.currentThread().interrupt();
                        }
                    }
                }

                try
                {

                    while (((_queue.blockingPeek()) != null) && !_closed.get())
                    {
                        synchronized (_lock)
                        {
                            if (!isClosed() && !isClosing() && !_closed.get())
                            {
                                Dispatchable disp = _queue.nonBlockingTake();

                                if(disp != null)
                                {
                                    disp.dispatch(AMQSession.this);
                                }
                            }
                        }
                    }
                }
                catch (InterruptedException e)
                {
                    // ignored as run will exit immediately
                }
            }
            finally
            {
                _closeCompleted.countDown();
                if (_dispatcherLogger.isDebugEnabled())
                {
                    _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
                }
            }
        }

        // only call while holding lock
        final boolean connectionStopped()
        {
            return _connectionStopped.get();
        }

        boolean setConnectionStopped(boolean connectionStopped)
        {
            boolean currently = _connectionStopped.get();
            if(connectionStopped != currently)
            {
                synchronized (_lock)
                {
                    _connectionStopped.set(connectionStopped);
                    _lock.notifyAll();

                    if (_dispatcherLogger.isDebugEnabled())
                    {
                        _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped
                                ? "Stopped"
                                : "Started")
                                                + ": Currently " + (currently ? "Stopped" : "Started"));
                    }
                }
            }
            return currently;
        }

        private void dispatchMessage(UnprocessedMessage message)
        {
            long deliveryTag = message.getDeliveryTag();

            synchronized (_lock)
            {

                try
                {
                    while (!getAMQConnection().isFailingOver() && connectionStopped())
                    {
                        _lock.wait();
                    }
                }
                catch (InterruptedException e)
                {
                    Thread.currentThread().interrupt();
                }

                if (!(message instanceof CloseConsumerMessage)
                    && tagLE(deliveryTag, _rollbackMark.get()))
                {
                    if (_logger.isDebugEnabled())
                    {
                        _logger.debug("Rejecting message because delivery tag " + deliveryTag
                                + " <= rollback mark " + _rollbackMark.get());
                    }
                    rejectMessage(message, true);
                }
                else if (_usingDispatcherForCleanup)
                {
                    _prefetchedMessageTags.add(deliveryTag);
                }
                else
                {
                    while (!isClosed() && !isClosing())
                    {
                        if (tryLockMessageDelivery())
                        {
                            try
                            {
                                notifyConsumer(message);
                                break;
                            }
                            finally
                            {
                                unlockMessageDelivery();
                            }
                        }
                    }
                }
            }

            long current = _rollbackMark.get();
            if (updateRollbackMark(current, deliveryTag))
            {
                _rollbackMark.compareAndSet(current, deliveryTag);
            }
        }

        private void notifyConsumer(UnprocessedMessage message)
        {
            final C consumer = _consumers.get(message.getConsumerTag());

            if ((consumer == null) || consumer.isClosed() || consumer.isClosing())
            {
                if (_dispatcherLogger.isInfoEnabled())
                {
                    if (consumer == null)
                    {
                        _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message("
                                               + System.identityHashCode(message) + ")" + "["
                                               + message.getDeliveryTag() + "] from queue "
                                               + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
                    }
                    else
                    {
                        if (consumer.isBrowseOnly())
                        {
                            _dispatcherLogger.info("Received a message("
                                                   + System.identityHashCode(message) + ")" + "["
                                                   + message.getDeliveryTag() + "] from queue " + " consumer("
                                                   + message.getConsumerTag() + ") is closed and a browser so dropping...");
                            //DROP MESSAGE
                            return;

                        }
                        else
                        {
                            _dispatcherLogger.info("Received a message("
                                                   + System.identityHashCode(message) + ")" + "["
                                                   + message.getDeliveryTag() + "] from queue " + " consumer("
                                                   + message.getConsumerTag() + ") is closed rejecting(requeue)...");
                        }
                    }
                }
                // Don't reject if we're already closing
                if (!_closed.get())
                {
                    if (_logger.isDebugEnabled())
                    {
                        _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
                                + " for closing consumer " + String.valueOf(consumer == null? null: consumer.getConsumerTag()));
                    }
                    rejectMessage(message, true);
                }
            }
            else
            {
                consumer.notifyMessage(message);
            }
        }
    }

    protected abstract boolean tagLE(long tag1, long tag2);

    protected abstract boolean updateRollbackMark(long current, long deliveryTag);

    public abstract AMQMessageDelegateFactory getMessageDelegateFactory();

    private class SuspenderRunner implements Runnable
    {
        private AtomicBoolean _suspend;

        public SuspenderRunner(AtomicBoolean suspend)
        {
            _suspend = suspend;
        }

        public void run()
        {
            try
            {
                synchronized (_suspensionLock)
                {
                    // If the session has closed by the time we get here
                    // then we should not attempt to write to the session/channel.
                    if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                    {
                        suspendChannel(_suspend.get());
                    }
                }
            }
            catch (QpidException e)
            {
                _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", e);
                if (_logger.isDebugEnabled())
                {
                    _logger.debug("Is the _queue empty?" + _queue.isEmpty());
                    _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher.getClosed()));
                }
            }
        }
    }

    /**
     * Checks if the Session and its parent connection are closed
     *
     * @return <tt>true</tt> if this is closed, <tt>false</tt> otherwise.
     */
    @Override
    public boolean isClosed()
    {
        return super.isClosed() || _connection.isClosed();
    }

    public boolean isSessionClosed()
    {
        return super.isClosed();
    }

    /**
     * Checks if the Session and its parent connection are capable of performing
     * closing operations
     *
     * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
     */
    @Override
    public boolean isClosing()
    {
        return super.isClosing() || _connection.isClosing();
    }

    public boolean isDeclareExchanges()
    {
    	return _declareExchanges;
    }

    JMSException toJMSException(String message, TransportException e)
    {
        int code = getErrorCode(e);
        return JMSExceptionHelper.chainJMSException(new JMSException(message, Integer.toString(code)), e);
    }

    private int getErrorCode(TransportException e)
    {
        int code = ErrorCodes.INTERNAL_ERROR;
        if (e instanceof SessionException)
        {
            SessionException se = (SessionException) e;
            if(se.getException() != null && se.getException().getErrorCode() != null)
            {
                code = se.getException().getErrorCode().getValue();
            }
        }
        return code;
    }

    JMSException toJMSException(String message, QpidException e)
    {
        JMSException ex;
        int errorCode = 0;

        if (e instanceof AMQException)
        {
            errorCode = ((AMQException) e).getErrorCode();
        }

        if (errorCode == ErrorCodes.ACCESS_REFUSED)
        {
            ex = JMSExceptionHelper.chainJMSException(new JMSSecurityException(message,
                                                                               Integer.toString(errorCode)), e);
        }
        else
        {
            ex = JMSExceptionHelper.chainJMSException(new JMSException(message, errorCode == 0 ? null : Integer.toString(errorCode)), e);
        }
        return ex;
    }

    private boolean isBrowseOnlyDestination(Destination destination)
    {
        return ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly());
    }

    private void setRollbackMark()
    {
        // Let the dispatcher know that all the incomming messages
        // should be rolled back(reject/release)
        _rollbackMark.set(_highestDeliveryTag.get());
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Rollback mark is set to " + _rollbackMark.get());
        }
    }


    public MessageEncryptionHelper getMessageEncryptionHelper()
    {
        return _messageEncryptionHelper;
    }

    protected void drainDispatchQueueWithDispatcher()
    {
        if (!_queue.isEmpty())
        {
            try
            {
                setUsingDispatcherForCleanup(true);
                drainDispatchQueue();
            }
            finally
            {
                setUsingDispatcherForCleanup(false);
            }
        }
    }

    protected void stopExistingDispatcher()
    {
        Dispatcher dispatcher = _dispatcher;
        if (dispatcher != null)
        {
            dispatcher.setConnectionStopped(true);
        }
    }

    protected void suspendChannelIfNotClosing() throws QpidException
    {
        if (!(isClosed() || isClosing()))
        {
            suspendChannel(true);
        }
    }

    private void shutdownFlowControlNoAckTaskPool()
    {
        if (_flowControlNoAckTaskPool != null)
        {
            _flowControlNoAckTaskPool.shutdown();
        }
    }

}

