/*
*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageEOFException;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.StreamMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.ConnectionRedirectException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.security.CallbackHandlerRegistry;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.jms.Session;
import org.apache.qpid.jndi.ObjectFactory;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.url.URLSyntaxException;

public class AMQConnection extends Closeable implements CommonConnection, Referenceable,
                                                        ClassLoadingAwareObjectInputStream.TrustedClassFilter
{
    public static final String JNDI_ADDRESS_CONNECTION_URL = "connectionURL";

    private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);

    static
    {
        ClientProperties.ensureIsLoaded();
    }

    private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
    private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT,
                                                                   ClientProperties.DEFAULT_CLOSE_TIMEOUT);

    private final long _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();

    private final List<String> _whiteListedClassHierarchies;
    private final List<String> _blackListedClassHierarchies;

    /**
     * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
     * held by any child objects of this connection such as the session, producers and consumers.
     */
    private final Object _failoverMutex = new Object();

    private final Object _sessionCreationLock = new Object();
    private final ConnectAttemptListener _connectAttemptListener;

    /**
     * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
     * and we must prevent the client from opening too many.
     */
    private long _maximumChannelCount;

    /** The maximum size of frame supported by the server */
    private long _maximumFrameSize;

    /**
     * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
     * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
     * handler.
     */
    private final AMQProtocolHandler _protocolHandler;

    /** Maps from session id (Integer) to AMQSession instance */
    private final ChannelToSessionMap _sessions = new ChannelToSessionMap();

    private String _clientName;

    private volatile String _username;

    /** The virtual path to connect to on the AMQ server */
    private String _virtualHost;

    /** The exception listener for this connection object. */
    private volatile ExceptionListener _exceptionListener;

    private ConnectionListener _connectionListener;

    private final ConnectionURL _connectionURL;

    /**
     * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
     * publication.
     */
    private volatile boolean _started;

    /** Policy dictating how to failover */
    private FailoverPolicy _failoverPolicy;

    /*
     * _Connected should be refactored with a suitable wait object.
     */
    private boolean _connected;

    private boolean _connectionAttempted;

    /*
     * The connection meta data
     */
    private QpidConnectionMetaData _connectionMetaData;

    private String _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
    private String _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
    private String _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
    private String _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;

    /**
     * Thread Pool for executing connection level processes such as reporting asynchronous exceptions
     * and for 0-8..0-91 returning bounced messages.
     */
    private final ScheduledExecutorService _taskPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
    {
        @Override
        public Thread newThread(final Runnable r)
        {
            final String name = "Connection_" + AMQConnection.this._connectionNumber + "_task";
            _logger.debug("Creating connection pooled thread '{}'", name);
            Thread thread = new Thread(r, name);
            if (!thread.isDaemon())
            {
                thread.setDaemon(true);
            }

            return thread;
        }
    });

    private AMQConnectionDelegate _delegate;

    // this connection maximum number of prefetched messages
    private int _maxPrefetch;

    //Indicates whether persistent messages are synchronized
    private boolean _syncPersistence;

    //Indicates whether we need to sync on every message ack
    private boolean _syncAck;

    //Indicates whether we need to sync on every message ack on a client ack session, default to true
    private final boolean _syncClientAck;

    //Indicates the sync publish options (persistent|all)
    //By default it's async publish
    private String _syncPublish = "";

    //Indicates whether user-id should be attached to every sent message
    //By default the user ID is attached
    private boolean _populateUserId = true;

    // Indicates whether to use the old map message format or the
    // new amqp-0-10 encoded format.
    private boolean _useLegacyMapMessageFormat;

    // Indicates whether to use the old stream message format or the
    // new amqp-0-10 list encoded format.
    private boolean _useLegacyStreamMessageFormat;

    // When sending to a Queue destination for the first time, check that the queue is bound
    private final boolean _validateQueueOnSend;

    //used to track the last failover time for
    //Address resolution purposes
    private volatile long _lastFailoverTime = 0;

    private boolean _compressMessages;
    private int _messageCompressionThresholdSize;

    private final Map<String, String> _virtualHostProperties = new HashMap<>();
    private volatile boolean _virtualHostPropertiesPopulated;

    static
    {
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Qpid version : " +  CommonProperties.getVersionString());
        }

        // The registering of any additional SASL mechanisms with the Java Security API requires
        // SecurityManager permissions.  In execution environments such as web containers,
        // this may require adjustments to the Java security.policy.
        CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance();
        if (_logger.isDebugEnabled())
        {
           _logger.debug("Loaded mechanisms " + registry.getMechanisms());
        }
    }

    private ConnectionSettings _connectionSettings;
    private final ConcurrentMap<String, KeyStore> _brokerTrustStores = new ConcurrentHashMap<>();
    private Session _brokerTrustStoreSession;

    /**
     * @param broker      brokerdetails
     * @param username    username
     * @param password    password
     * @param clientName  clientid
     * @param virtualHost virtualhost
     *
     * @throws QpidException
     * @throws URLSyntaxException
     */
    public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
            throws QpidException, URLSyntaxException
    {
        this(new AMQConnectionURL(
                ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
                + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
                + BrokerDetails.checkTransport(broker) + "'"));
    }

    public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
            throws QpidException, URLSyntaxException
    {
        this(new AMQConnectionURL(
                   ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
                   + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"));
    }

    public AMQConnection(String connection) throws QpidException, URLSyntaxException
    {
        this(new AMQConnectionURL(connection));
    }

    public AMQConnection(ConnectionURL connectionURL) throws QpidException
    {
        this(connectionURL, null);
    }

    AMQConnection(final ConnectionURL connectionURL, final ConnectAttemptListener connectAttemptListener)
            throws QpidException
    {
        _connectAttemptListener = connectAttemptListener;
        boolean success = false;
        try
        {
            if (connectionURL == null)
            {
                throw new IllegalArgumentException("Connection must be specified");
            }

            if (_logger.isDebugEnabled())
            {
                _logger.debug("Connection(" + _connectionNumber + "):" + connectionURL);
            }

            // set this connection maxPrefetch
            if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
            {
                _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
            }
            else
            {
                // use the default value set for all connections
                _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
                        ClientProperties.MAX_PREFETCH_DEFAULT));
            }

            if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
            {
                _syncPersistence =
                    Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
                _logger.warn("sync_persistence is a deprecated property, " +
                        "please use sync_publish={persistent|all} instead");
            }
            else
            {
                // use the default value set for all connections
                _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
                if (_syncPersistence)
                {
                    _logger.warn("sync_persistence is a deprecated property, " +
                            "please use sync_publish={persistent|all} instead");
                }
            }

            if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
            {
                _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
            }
            else
            {
                // use the default value set for all connections
                _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
            }

            if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_CLIENT_ACK) != null)
            {
                _syncClientAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_CLIENT_ACK));
            }
            else
            {
                String legacyProperty = System.getProperty("qpid.sync_after_client.ack");
                if (legacyProperty != null)
                {
                    _logger.warn("'qpid.sync_after_client.ack' is a deprecated system property, " +
                                 "please use '{}' instead", ClientProperties.SYNC_CLIENT_ACK);
                }
                _syncClientAck = Boolean.parseBoolean(System.getProperty(ClientProperties.SYNC_CLIENT_ACK,
                                                                         legacyProperty != null ? legacyProperty : "true"));
            }

            if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
            {
                _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
            }
            else
            {
                // use the default value set for all connections
                _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
            }

            if (connectionURL.getOption(ConnectionURL.OPTIONS_POPULATE_USER_ID) != null)
            {
                _populateUserId = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_POPULATE_USER_ID));
            }

            if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
            {
                _useLegacyMapMessageFormat =  Boolean.parseBoolean(
                        connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT));
            }
            else
            {
                // use the default value set for all connections
                _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
            }

            if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT) != null)
            {
                _useLegacyStreamMessageFormat =  Boolean.parseBoolean(
                        connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT));
            }
            else
            {
                // use the default value set for all connections
                _useLegacyStreamMessageFormat = System.getProperty(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT) == null ?
                        true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT);
            }

            if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null)
            {
                _validateQueueOnSend = Boolean.parseBoolean(
                                    connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND));
            }
            else
            {
                _validateQueueOnSend =
                    Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
            }

            if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null)
            {
                _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES));
            }
            else
            {
                _compressMessages =
                        Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES,
                                             String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES)));
            }

            if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null)
            {
                _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE));
            }
            else
            {
                _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE,
                                                                    ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE);
            }
            if(_messageCompressionThresholdSize <= 0)
            {
                _messageCompressionThresholdSize = Integer.MAX_VALUE;
            }

            String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
            if (_logger.isDebugEnabled())
            {
                _logger.debug("AMQP version " + amqpVersion);
            }

            _failoverPolicy = new FailoverPolicy(connectionURL, this);
            if ("0-8".equals(amqpVersion))
            {
                _delegate = new AMQConnectionDelegate_8_0(this);
            }
            else if ("0-9".equals(amqpVersion))
            {
                _delegate = new AMQConnectionDelegate_0_9(this);
            }
            else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion))
            {
                _delegate = new AMQConnectionDelegate_0_91(this);
            }
            else
            {
                _delegate = new AMQConnectionDelegate_0_10(this);
            }

            _connectionURL = connectionURL;

            _clientName = connectionURL.getClientName();

            setVirtualHost(connectionURL.getVirtualHost());

            if (connectionURL.getDefaultQueueExchangeName() != null)
            {
                _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
            }

            if (connectionURL.getDefaultTopicExchangeName() != null)
            {
                _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
            }

            if (connectionURL.getTemporaryQueueExchangeName() != null)
            {
                _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
            }

            if (connectionURL.getTemporaryTopicExchangeName() != null)
            {
                _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
            }

            _protocolHandler = new AMQProtocolHandler(this);

            if (_logger.isDebugEnabled())
            {
                _logger.debug("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
            }

            // We are not currently connected
            setConnected(false);

            if (_clientName != null)
            {
                makeConnection();
            }

            _connectionMetaData = new QpidConnectionMetaData();

            if (connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST) != null)
            {
                String whiteListedClassHierarchiesString = connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST);
                _whiteListedClassHierarchies = Arrays.asList(whiteListedClassHierarchiesString.split(","));
            }
            else
            {
                final String defaultWhiteListedClassHierarchiesString = System.getProperty(CommonProperties.QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST, "*");
                _whiteListedClassHierarchies = Arrays.asList(defaultWhiteListedClassHierarchiesString.split(","));
            }

            if (connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST) != null)
            {
                String blackListedClassHierarchiesString = connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST);
                _blackListedClassHierarchies = Arrays.asList(blackListedClassHierarchiesString.split(","));
            }
            else
            {
                final String defaultBlackListedClassHierarchiesString = System.getProperty(CommonProperties.QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST, "");
                _blackListedClassHierarchies = Arrays.asList(defaultBlackListedClassHierarchiesString.split(","));
            }
            success = true;
        }
        finally
        {
            if (!success)
            {
                shutdownTaskPool();
            }
        }
    }

    private void makeConnection() throws QpidException
    {
        _connectionAttempted = true;
        if(_clientName == null)
        {

            try
            {
                InetAddress addr = InetAddress.getLocalHost();
                _clientName =  addr.getHostName() + System.currentTimeMillis();
            }
            catch (UnknownHostException e)
            {
                _clientName = "UnknownHost" + UUID.randomUUID();
            }
        }
        BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
        BrokerDetails lastBrokerDetails = null;
        boolean retryAllowed = true;
        Exception connectionException = null;
        while (!isConnected() && retryAllowed && brokerDetails != null)
        {
            lastBrokerDetails = brokerDetails;
            ProtocolVersion pe = null;
            try
            {
                pe = makeBrokerConnection(brokerDetails);
            }
            catch (Exception e)
            {
                if (_logger.isInfoEnabled())
                {
                    _logger.info("Unable to connect to broker at " +
                                 _failoverPolicy.getCurrentBrokerDetails(),
                                 e);
                }
                connectionException = e;
            }

            if (pe != null)
            {
                // reset the delegate to the version returned by the
                // broker
                initDelegate(pe);
            }
            else if (!isConnected())
            {
                if(connectionException instanceof ConnectionRedirectException)
                {
                    ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException;
                    retryAllowed = true;
                    brokerDetails = new BrokerDetails(brokerDetails);
                    brokerDetails.setHost(redirect.getHost());
                    brokerDetails.setPort(redirect.getPort());
                    _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));

                }
                else
                {
                    if (repeatLastConnectAttempt(connectionException, brokerDetails))
                    {
                        brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
                    }
                    else
                    {
                        retryAllowed = _failoverPolicy.failoverAllowed();
                        brokerDetails = _failoverPolicy.getNextBrokerDetails();
                    }
                    _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
                }
            }
        }
        verifyClientID();

        if (_logger.isDebugEnabled())
        {
            _logger.debug("Are we connected:" + isConnected());
        }

        if (!isConnected())
        {
            if (_logger.isDebugEnabled())
            {
                _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
            }

            String message = null;

            if (connectionException != null)
            {
                if (connectionException.getCause() != null)
                {
                    message = connectionException.getCause().getMessage();
                }
                else
                {
                    message = connectionException.getMessage();
                }
            }

            if (message == null)
            {
                message = "Unable to Connect";
            }
            else if("".equals(message))
            {
                message = "Unable to Connect:" + connectionException.getClass();
            }

            for (Throwable th = connectionException; th != null; th = th.getCause())
            {
                if (th instanceof UnresolvedAddressException ||
                    th instanceof UnknownHostException)
                {
                    throw new AMQUnresolvedAddressException
                        (message,
                         _failoverPolicy.getCurrentBrokerDetails().toString(),
                         connectionException);
                }
            }

            throw new AMQConnectionFailureException(message, connectionException);
        }

        if (_logger.isDebugEnabled())
        {
        	_logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
        }
        notifySuccessfulConnectAttempt(lastBrokerDetails);
        _sessions.setMaxChannelID(_delegate.getMaxChannelID());
        _sessions.setMinChannelID(_delegate.getMinChannelID());
    }

    private boolean repeatLastConnectAttempt(final Exception e, final BrokerDetails brokerDetails)
    {
        boolean repeatLastReconnectAttempt = false;
        if (_connectAttemptListener != null)
        {
            final AMQException amqException;
            if (e instanceof AMQException)
            {
                amqException = (AMQException) e;
            }
            else
            {
                amqException = new AMQException(ErrorCodes.CONNECTION_FORCED, e.getMessage(), e);
            }

            try
            {
                repeatLastReconnectAttempt = _connectAttemptListener.connectAttemptFailed(brokerDetails.getURI(), convertToJMSException(amqException));
            }
            catch (RuntimeException unexpected)
            {
                _logger.warn("Unexpected exception occurred on notifying about connect attempt failure", unexpected);
            }
        }
        return repeatLastReconnectAttempt;
    }

    private void notifySuccessfulConnectAttempt(final BrokerDetails brokerDetails)
    {
        if (_connectAttemptListener != null)
        {
            try
            {
                _connectAttemptListener.connectAttemptSucceeded(brokerDetails.getURI());
            }
            catch (RuntimeException unexpected)
            {
                _logger.warn("Unexpected exception occurred on notifying about successful connect attempt", unexpected);
            }
        }
    }

    private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
    {
        try
        {
            String delegateClassName = String.format
                                    ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
                                     pe.getMajorVersion(), pe.getMinorVersion());
            if (_logger.isDebugEnabled())
            {
            	_logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe);
            }
            Class c = Class.forName(delegateClassName);
            Class partypes[] = new Class[1];
            partypes[0] = AMQConnection.class;
            _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);

            if (!ProtocolVersion.v0_10.equals(_delegate.getProtocolVersion()))
            {
                _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
            }

            // reset state waiter state
            _protocolHandler.getStateManager().clearLastException();
            _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED);
        }
        catch (ClassNotFoundException e)
        {
            String errorMessage = String.format("Protocol: %s.%s is required by the broker but is not " +
                                                "currently supported by this client library implementation",
                                                pe.getMajorVersion(), pe.getMinorVersion());

            throw new AMQProtocolException(errorMessage, e);
        }
        catch (NoSuchMethodException e)
        {
            throw new RuntimeException("unable to locate constructor for delegate", e);
        }
        catch (InstantiationException e)
        {
            throw new RuntimeException("error instantiating delegate", e);
        }
        catch (IllegalAccessException e)
        {
            throw new RuntimeException("error accessing delegate", e);
        }
        catch (InvocationTargetException e)
        {
            throw new RuntimeException("error invoking delegate", e);
        }
    }

    private void setVirtualHost(String virtualHost)
    {
        if (virtualHost != null && virtualHost.startsWith("/"))
        {
            virtualHost = virtualHost.substring(1);
        }

        _virtualHost = virtualHost;
    }

    public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure)
    {
        BrokerDetails bd = new BrokerDetails(_failoverPolicy.getCurrentBrokerDetails());
        bd.setHost(host);
        bd.setPort(port);

        _failoverPolicy.setBroker(bd);

        try
        {
            makeBrokerConnection(bd);
            notifySuccessfulConnectAttempt(bd);
            return true;
        }
        catch (Exception e)
        {
            if (_logger.isInfoEnabled())
            {
                _logger.info("Unable to connect to broker at " + bd);
            }
            if (repeatLastConnectAttempt(e, bd))
            {
                return attemptReconnection(host, port, useFailoverConfigOnFailure);
            }
            else
            {
                return useFailoverConfigOnFailure && attemptReconnection();
            }
        }

    }

    public boolean attemptReconnection()
    {
        BrokerDetails broker;
        while (!isClosed() && !isClosing() && _failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
        {
            if (attemptConnection(broker))
            {
                return true;
            }
        }

        // connection unsuccessful
        return false;
    }

    private boolean attemptConnection(final BrokerDetails broker)
    {
        try
        {
            makeBrokerConnection(broker);
            notifySuccessfulConnectAttempt(broker);
            return true;
        }
        catch (Exception e)
        {
            if (!(e instanceof QpidException))
            {
                if (_logger.isInfoEnabled())
                {
                    _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
                }
            }
            else
            {
                if (_logger.isInfoEnabled())
                {
                    _logger.info(e.getMessage() + ":Unable to connect to broker at "
                                 + _failoverPolicy.getCurrentBrokerDetails());
                }
                if (repeatLastConnectAttempt(e, broker))
                {
                    return attemptConnection(broker);
                }
            }
        }
        return false;
    }

    public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, QpidException
    {
        return _delegate.makeBrokerConnection(brokerDetail);
    }

    public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
    {
        return _delegate.executeRetrySupport(operation);
    }

    /**
     * Get the details of the currently active broker
     *
     * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
     *         otherwise
     */
    public BrokerDetails getActiveBrokerDetails()
    {
        return _failoverPolicy.getCurrentBrokerDetails();
    }

    public boolean failoverAllowed()
    {
        if (!isConnected())
        {
            return false;
        }
        else
        {
            return _failoverPolicy.failoverAllowed();
        }
    }

    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
    {
        return createSession(transacted, acknowledgeMode, _maxPrefetch);
    }

    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
            throws JMSException
    {
        return createSession(transacted, acknowledgeMode, prefetch, prefetch);
    }

    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
                                                     final int prefetchHigh, final int prefetchLow) throws JMSException
    {
        synchronized (_sessionCreationLock)
        {

            checkNotClosed();

            if(!_connectionAttempted)
            {
                try
                {
                    makeConnection();
                }
                catch (QpidException e)
                {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Unable to establish connection"),e);
                }
            }

            if(_delegate.isVirtualHostPropertiesSupported() && !_virtualHostPropertiesPopulated)
            {
                retrieveVirtualHostPropertiesIfNecessary();
            }
            return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
        }
    }

    private void retrieveVirtualHostPropertiesIfNecessary() throws JMSException
    {
        synchronized (_virtualHostProperties)
        {
            if(!_virtualHostPropertiesPopulated)
            {
                final Session session = _delegate.createSession(false, AMQSession.NO_ACKNOWLEDGE, 3,3);
                final MessageConsumer consumer = session.createConsumer(session.createQueue(
                        "ADDR: $virtualhostProperties; {assert: never, create: never, node:{ type: queue }}"));
                try
                {
                    ((AMQSession)session).start();
                }
                catch (QpidException e)
                {
                    throw JMSExceptionHelper.chainJMSException(new JMSException(
                            "Failed to retrieve virtual host properties"), e);
                }
                Message propertiesMessage = consumer.receive(getProtocolHandler().getDefaultTimeout());
                if(propertiesMessage != null)
                {
                    for(String property : Collections.list((Enumeration<String>) propertiesMessage.getPropertyNames()))
                    {
                        _virtualHostProperties.put(property, propertiesMessage.getStringProperty(property));
                    }
                }
                session.close();
                _virtualHostPropertiesPopulated = true;
            }
        }
    }

    public KeyStore getBrokerSuppliedTrustStore(final String name) throws JMSException
    {
        synchronized(_brokerTrustStores)
        {
            if(!_brokerTrustStores.containsKey(name))
            {
                if(_brokerTrustStoreSession == null)
                {
                    _brokerTrustStoreSession = _delegate.createSession(false, AMQSession.AUTO_ACKNOWLEDGE, 1, 1);
                    try
                    {
                        ((AMQSession) _brokerTrustStoreSession).start();
                    }
                    catch (QpidException e)
                    {
                        throw JMSExceptionHelper.chainJMSException(new JMSException(
                                "Failed to retrieve virtual host properties"), e);
                    }
                }
                final MessageConsumer consumer = _brokerTrustStoreSession.createConsumer(_brokerTrustStoreSession.createQueue(
                        "ADDR: " + name + "; {assert: never, create: never, node:{ type: queue }}"));
                final Message message  = consumer.receive(2000l);
                if(message != null)
                {
                    StreamMessage streamMessage = (StreamMessage) message;
                    List<X509Certificate> certs = new ArrayList<>();
                    try
                    {
                        try
                        {

                            final CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
                            byte[] bytes;
                            while ((bytes = (byte[]) streamMessage.readObject()) != null)
                            {
                                certs.add((X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(
                                        bytes)));
                            }
                        }
                        catch (MessageEOFException e)
                        {
                            // end of message
                        }
                        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());

                        char[] encryptionTrustStorePassword =
                                getConnectionSettings().getEncryptionTrustStorePassword() == null
                                        ? null
                                        : getConnectionSettings().getEncryptionTrustStorePassword().toCharArray();

                        keyStore.load(null, encryptionTrustStorePassword);
                        int i = 1;
                        for (X509Certificate cert : certs)
                        {
                            keyStore.setCertificateEntry(String.valueOf(i++), cert);
                        }
                        _brokerTrustStores.put(name, keyStore);
                    }
                    catch (JMSException | GeneralSecurityException | IOException e)
                    {
                        _logger.error(e.getMessage(), e);
                    }
                }

            }
            return _brokerTrustStores.get(name);

        }
    }

    public void setFailoverPolicy(FailoverPolicy policy)
    {
        _failoverPolicy = policy;
    }

    public FailoverPolicy getFailoverPolicy()
    {
        return _failoverPolicy;
    }

    /**
     * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
     * the JMS spec
     *
     * @param transacted
     * @param acknowledgeMode
     *
     * @return QueueSession
     *
     * @throws JMSException
     */
    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
    {
        return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode));
    }

    /**
     * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
     * the JMS spec
     *
     * @param transacted
     * @param acknowledgeMode
     *
     * @return TopicSession
     *
     * @throws JMSException
     */
    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
    {
        return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
    }

    public boolean channelLimitReached()
    {
        return _sessions.size() >= _maximumChannelCount;
    }

    public String getClientID() throws JMSException
    {
        checkNotClosed();

        return _clientName;
    }

    public void setClientID(String clientID) throws JMSException
    {
        checkNotClosed();
        synchronized(_sessionCreationLock)
        {
            if(_connectionAttempted)
            {
                // in AMQP it is not possible to change the client ID. If one is not specified
                // upon connection construction, an id is generated automatically. Therefore
                // we can always throw an exception.
                if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
                {
                    throw new IllegalStateException("Client name cannot be changed after being set");
                }
                else
                {
                    _logger.info("Operation setClientID is ignored using ID: " + getClientID());
                }
            }
            else
            {
                _clientName = clientID;
            }
        }
    }

    public ConnectionMetaData getMetaData() throws JMSException
    {
        checkNotClosed();

        return _connectionMetaData;

    }

    protected final ExceptionListener getExceptionListenerNoCheck()
    {
        return _exceptionListener;
    }

    public ExceptionListener getExceptionListener() throws JMSException
    {
        checkNotClosed();
        return getExceptionListenerNoCheck();
    }

    public void setExceptionListener(ExceptionListener listener) throws JMSException
    {
        checkNotClosed();

        _exceptionListener = listener;
    }

    /**
     * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
     * and is not thread safe (which is legal according to the JMS specification).
     *
     * @throws JMSException
     */
    public void start() throws JMSException
    {
        checkNotClosed();
        if (!_started)
        {
            _started = true;
            final Iterator it = _sessions.values().iterator();
            while (it.hasNext())
            {
                final AMQSession s = (AMQSession) (it.next());
                try
                {
                    s.start();
                }
                catch (QpidException e)
                {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Connection.start failed"), e);
                }
            }

        }
    }

    public void stop() throws JMSException
    {
        checkNotClosed();
        if (_started)
        {
            for (Iterator i = _sessions.values().iterator(); i.hasNext();)
            {
                try
                {
                    ((AMQSession) i.next()).stop();
                }
                catch (QpidException e)
                {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Connection.stop failed."), e);
                }
            }

            _started = false;
        }
    }

    public void close() throws JMSException
    {
        close(DEFAULT_CLOSE_TIMEOUT);
    }

    private void close(long timeout) throws JMSException
    {
        boolean closed;

        synchronized (_sessionCreationLock)
        {
            closed = setClosed();
        }

        try
        {
            if (!closed)
            {
                List<AMQSession> sessions = new ArrayList<>(_sessions.values());

                setClosing(true);
                try
                {
                    doClose(sessions, timeout);
                }
                finally
                {
                    setClosing(false);
                }
            }
        }
        finally
        {
            shutdownTaskPool();
        }
    }

    private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
    {
        if (!sessions.isEmpty())
        {
            AMQSession session = sessions.remove(0);
            session.lockMessageDelivery();
            try
            {
                doClose(sessions, timeout);
            }
            finally
            {
                session.unlockMessageDelivery();
            }
        }
        else
        {
            synchronized (getFailoverMutex())
            {
                try
                {
                    closeAllSessions(null, timeout);
                }
                catch (JMSException e)
                {
                    _logger.warn("Error closing connection", e);
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + e), e);
                }
                finally
                {
                    try
                    {
                        _delegate.closeConnection(timeout);
                    }
                    catch (Exception e)
                    {
                        _logger.warn("Error closing underlying protocol connection", e);
                    }
                }
            }
        }
    }

    private void shutdownTaskPool()
    {
        _taskPool.shutdown();
    }

    /**
     * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
     * mark objects "visible" in userland as closed after failover or other significant event that impacts the
     * connection. <p/> The caller must hold the failover mutex before calling this method.
     */
    private void markAllSessionsClosed()
    {
        final LinkedList sessionCopy = new LinkedList(_sessions.values());
        final Iterator it = sessionCopy.iterator();
        while (it.hasNext())
        {
            final AMQSession session = (AMQSession) it.next();

            session.markClosed();
        }

        _sessions.clear();
    }

    /**
     * Close all the sessions, either due to normal connection closure or due to an error occurring.
     *
     * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
     *              before calling this method.
     */
    private void closeAllSessions(Throwable cause, long timeout) throws JMSException
    {
        final LinkedList sessionCopy = new LinkedList(_sessions.values());
        final Iterator it = sessionCopy.iterator();
        JMSException sessionException = null;
        while (it.hasNext())
        {
            final AMQSession session = (AMQSession) it.next();
            if (cause != null)
            {
                session.closed(cause);
            }
            else
            {
                try
                {
                    session.close(timeout);
                }
                catch (JMSException e)
                {
                    _logger.warn("Error closing session: " + e);
                    sessionException = e;
                }
            }
        }

        _sessions.clear();
        if (sessionException != null)
        {
            throw sessionException;
        }
    }

    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException
    {
        checkNotClosed();

        throw new JmsNotImplementedException();

    }

    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
                                                       int maxMessages) throws JMSException
    {
        checkNotClosed();

        throw new JmsNotImplementedException();
    }

    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
                                                       int maxMessages) throws JMSException
    {
        checkNotClosed();

        throw new JmsNotImplementedException();
    }

    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
                                                              ServerSessionPool sessionPool, int maxMessages) throws JMSException
    {
        checkNotClosed();

        throw new JmsNotImplementedException();
    }

    public long getMaximumChannelCount() throws JMSException
    {
        checkNotClosed();

        return _maximumChannelCount;
    }

    public void setConnectionListener(ConnectionListener listener)
    {
        _connectionListener = listener;
    }

    public ConnectionListener getConnectionListener()
    {
        return _connectionListener;
    }

    public void setMaximumChannelCount(long maximumChannelCount)
    {
        _maximumChannelCount = maximumChannelCount;
    }

    public void setMaximumFrameSize(long frameMax)
    {
        _maximumFrameSize = frameMax;
    }

    public long getMaximumFrameSize()
    {
        return _maximumFrameSize;
    }

    public ChannelToSessionMap getSessions()
    {
        return _sessions;
    }

    public String getUsername()
    {
        final String username = _username;
        return username == null ? _connectionURL.getUsername() : username;
    }

    public void setUsername(String id)
    {
        _username = id;
    }

    public String getPassword()
    {
        return _connectionURL.getPassword();
    }

    public String getVirtualHost()
    {
        return _virtualHost;
    }

    public final AMQProtocolHandler getProtocolHandler()
    {
        return _protocolHandler;
    }

    public final boolean started()
    {
        return _started;
    }

    public final boolean isConnected()
    {
        return _connected;
    }

    protected final void setConnected(boolean connected)
    {
        _connected = connected;
    }

    public void bytesSent(long writtenBytes)
    {
        if (_connectionListener != null)
        {
            _connectionListener.bytesSent(writtenBytes);
        }
    }

    public void bytesReceived(long receivedBytes)
    {
        if (_connectionListener != null)
        {
            _connectionListener.bytesReceived(receivedBytes);
        }
    }

    /**
     * Fire the preFailover event to the registered connection listener (if any)
     *
     * @param redirect true if this is the result of a redirect request rather than a connection error
     *
     * @return true if no listener or listener does not veto change
     */
    public boolean firePreFailover(boolean redirect)
    {
        _lastFailoverTime = System.currentTimeMillis();
        boolean proceed = true;
        if (_connectionListener != null)
        {
            proceed = _connectionListener.preFailover(redirect);
        }

        return proceed;
    }

    /**
     * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
     * resubscription then all the sessions are closed.
     *
     * @return true if no listener or listener does not veto resubscription.
     *
     * @throws JMSException
     */
    public boolean firePreResubscribe() throws JMSException
    {
        if (_connectionListener != null)
        {
            boolean resubscribe = _connectionListener.preResubscribe();
            if (!resubscribe)
            {
                markAllSessionsClosed();
            }

            return resubscribe;
        }
        else
        {
            return true;
        }
    }

    /** Fires a failover complete event to the registered connection listener (if any). */
    public void fireFailoverComplete()
    {
        if (_connectionListener != null)
        {
            _connectionListener.failoverComplete();
        }
    }

    /**
     * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
     * "failover mutex" must be held when doing any operations that could be corrupted during failover.
     *
     * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
     */
    public final Object getFailoverMutex()
    {
        return _failoverMutex;
    }

    public void resubscribeSessions() throws JMSException, QpidException, FailoverException
    {
        _delegate.resubscribeSessions();
    }

    /**
     * If failover is taking place this will block until it has completed. If failover is not taking place it will
     * return immediately.
     *
     * @throws InterruptedException
     */
    public void blockUntilNotFailingOver() throws InterruptedException
    {
        _protocolHandler.blockUntilNotFailingOver();
    }

    /**
     * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
     * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
     * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
     *
     * @param cause the exception
     */
    public void exceptionReceived(Throwable cause)
    {

        if (_logger.isDebugEnabled())
        {
            _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
        }

        final JMSException je = convertToJMSException(cause);

        try
        {
            if (hardError(cause))
            {
                closeSessions(cause);
            }
        }
        finally
        {
            deliverJMSExceptionToExceptionListenerOrLog(je, cause);
        }
    }

    private JMSException convertToJMSException(Throwable cause)
    {
        final JMSException je;
        if (cause instanceof JMSException)
        {
            je = (JMSException) cause;
        }
        else
        {
            int errorCode = 0;

            if (cause instanceof AMQException)
            {
                errorCode = ((AMQException) cause).getErrorCode();
            }

            if (errorCode != 0)
            {
                je = JMSExceptionHelper.chainJMSException(new JMSException("Exception thrown against "
                                                                           + toString()
                                                                           + ": "
                                                                           + cause, Integer.toString(errorCode)),
                                                          cause);
            }
            else
            {
                //Should never get here as all AMQEs are required to have an ErrorCode!
                // Other than AMQDisconnectedEx!

                if (cause instanceof AMQDisconnectedException)
                {
                    Exception last = _protocolHandler.getStateManager().getLastException();
                    if (last != null)
                    {
                        _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception");
                        cause = last;
                    }
                }
                je = JMSExceptionHelper.chainJMSException(new JMSException("Exception thrown against "
                                                                           + toString()
                                                                           + ": "
                                                                           + cause), cause);
            }
        }
        return je;
    }

    void closed(Throwable cause)
    {
        _logger.debug("Closing closed connection {} ", this.toString());

        final JMSException je = convertToJMSException(cause);
        try
        {
            _protocolHandler.getProtocolSession().notifyError(je);
            boolean performClose = !setClosed();

            // if we are closing the connection, close sessions first
            if (performClose)
            {
                closeSessions(cause);
            }
        }
        finally
        {
            deliverJMSExceptionToExceptionListenerOrLog(je, cause);
        }
    }

    private void closeSessions(Throwable cause)
    {
        // get the failover mutex before trying to close
        synchronized (getFailoverMutex())
        {
            try
            {
                closeAllSessions(cause, -1);
            }
            catch (JMSException e)
            {
                _logger.error("Error closing all sessions: " + e, e);
            }
        }
    }

    private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause)
    {
        final ExceptionListener exceptionListener = getExceptionListenerNoCheck();
        if (exceptionListener != null)
        {
            performConnectionTask(new Runnable()
                                  {
                                      @Override
                                      public void run()
                                      {
                                          // deliver the exception if there is a listener
                                          try
                                          {
                                              exceptionListener.onException(je);
                                          }
                                          catch (RuntimeException e)
                                          {
                                              _logger.error("Exception occurred in ExceptionListener", e);
                                          }
                                      }
                                  });
        }
        else
        {
            _logger.error("Throwable Received but no listener set: " + cause);
        }


    }

    private boolean hardError(Throwable cause)
    {
        if (cause instanceof AMQException)
        {
            return ((AMQException) cause).isHardError();
        }

        return true;
    }

    void registerSession(int channelId, AMQSession session)
    {
        _sessions.put(channelId, session);
    }

    public void deregisterSession(int channelId)
    {
        _sessions.remove(channelId);
    }

    public String toString()
    {
        StringBuffer buf = new StringBuffer("AMQConnection:\n");
        if (_failoverPolicy.getCurrentBrokerDetails() == null)
        {
            buf.append("No active broker connection");
        }
        else
        {
            BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
            buf.append("Host: ").append(String.valueOf(bd.getHost()));
            buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
        }

        buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
        buf.append("\nClient ID: ").append(String.valueOf(_clientName));
        buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());

        return buf.toString();
    }

    /**
     * Returns connection url.
     * @return connection url
     */
    public ConnectionURL getConnectionURL()
    {
        return _connectionURL;
    }

    /**
     * Returns stringified connection url.   This url is suitable only for display
     * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
     * @return connection url
     */
    public String toURL()
    {
        return _connectionURL.toString();
    }

    public Reference getReference() throws NamingException
    {
        return new Reference(AMQConnection.class.getName(), new StringRefAddr(JNDI_ADDRESS_CONNECTION_URL, toURL()),
                             ObjectFactory.class.getName(), null); // factory location
    }

    public String getDefaultTopicExchangeName()
    {
        return _defaultTopicExchangeName;
    }

    public void setDefaultTopicExchangeName(String defaultTopicExchangeName)
    {
        _defaultTopicExchangeName = defaultTopicExchangeName;
    }

    public String getDefaultQueueExchangeName()
    {
        return _defaultQueueExchangeName;
    }

    public void setDefaultQueueExchangeName(String defaultQueueExchangeName)
    {
        _defaultQueueExchangeName = defaultQueueExchangeName;
    }

    public String getTemporaryTopicExchangeName()
    {
        return _temporaryTopicExchangeName;
    }

    public String getTemporaryQueueExchangeName()
    {
        return _temporaryQueueExchangeName;
    }

    public void setTemporaryTopicExchangeName(String temporaryTopicExchangeName)
    {
        _temporaryTopicExchangeName = temporaryTopicExchangeName;
    }

    public void setTemporaryQueueExchangeName(String temporaryQueueExchangeName)
    {
        _temporaryQueueExchangeName = temporaryQueueExchangeName;
    }

    public void performConnectionTask(Runnable task)
    {
        try
        {
            _taskPool.execute(task);
        }
        catch (RejectedExecutionException e)
        {
            if(!(isClosed() || isClosing()))
            {
                throw e;
            }
        }
    }

    ScheduledFuture<?> scheduleTask(Runnable task, long initialDelay, long period, TimeUnit timeUnit)
    {
        return _taskPool.scheduleAtFixedRate(task, initialDelay, period, timeUnit);
    }

    public AMQSession getSession(int channelId)
    {
        return _sessions.get(channelId);
    }

    public ProtocolVersion getProtocolVersion()
    {
        return _delegate.getProtocolVersion();
    }

    public String getBrokerUUID()
    {
        if(getProtocolVersion().equals(ProtocolVersion.v0_10))
        {
            return ((AMQConnectionDelegate_0_10)_delegate).getUUID();
        }
        else
        {
            return null;
        }
    }

    /**
     * Tests whether the Broker has advertised support for the named feature.
     *
     * @param featureName
     *
     * @return true if the feature is supported, or false otherwise.
     */
    boolean isSupportedServerFeature(final String featureName)
    {
        return _delegate.isSupportedServerFeature(featureName);
    }

    public boolean isFailingOver()
    {
        return (_protocolHandler.getFailoverLatch() != null);
    }

    /**
     * Get the maximum number of messages that this connection can pre-fetch.
     *
     * @return The maximum number of messages that this connection can pre-fetch.
     */
    public long getMaxPrefetch()
    {
        return _maxPrefetch;
    }

    /**
     * Indicates whether persistent messages are synchronized
     *
     * @return true if persistent messages are synchronized false otherwise
     */
    public boolean getSyncPersistence()
    {
        return _syncPersistence;
    }

    /**
     * Indicates whether we need to sync on every message ack
     */
    public boolean getSyncAck()
    {
        return _syncAck;
    }

    boolean getSyncClientAck()
    {
        return _syncClientAck;
    }

    public String getSyncPublish()
    {
        return _syncPublish;
    }

    public boolean isPopulateUserId()
    {
        return _populateUserId;
    }

    public boolean isMessageCompressionDesired()
    {
        return _compressMessages;
    }

    public int getNextChannelID()
    {
        return _sessions.getNextChannelId();
    }

    public boolean isUseLegacyMapMessageFormat()
    {
        return _useLegacyMapMessageFormat;
    }

    public boolean isUseLegacyStreamMessageFormat()
    {
        return _useLegacyStreamMessageFormat;
    }

    private void verifyClientID() throws QpidException
    {
        if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
        {
            try
            {
                if (!_delegate.verifyClientID())
                {
                    throw new AMQException(ErrorCodes.ALREADY_EXISTS, "ClientID must be unique");
                }
            }
            catch(JMSException e)
            {
                    throw new QpidException(e.getMessage(),e);
            }
        }
    }

    public long getLastFailoverTime()
    {
         return _lastFailoverTime;
    }

    protected AMQConnectionDelegate getDelegate()
    {
        return _delegate;
    }

    public Long getConnectionNumber()
    {
        return _connectionNumber;
    }

    protected void logConnected(SocketAddress localAddress, SocketAddress remoteAddress)
    {
        if(_logger.isInfoEnabled())
        {
            _logger.info("Connection " + _connectionNumber + " now connected from "
                         + localAddress + " to " + remoteAddress);
        }
    }

    void setHeartbeatListener(HeartbeatListener listener)
    {
        _delegate.setHeartbeatListener(listener);
    }

    public boolean validateQueueOnSend()
    {
        return _validateQueueOnSend;
    }

    public int getMessageCompressionThresholdSize()
    {
        return _messageCompressionThresholdSize;
    }

    void doWithAllLocks(Runnable r)
    {
        doWithAllLocks(r, _sessions.values());

    }

    private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
    {
        if (!sessions.isEmpty())
        {
            AMQSession session = sessions.remove(0);


            Object dispatcherLock = session.getDispatcherLock();
            if (dispatcherLock == null)
            {
                dispatcherLock = new Object(); // use dummy intrinsic lock to make subsequent code nicer
            }
            synchronized (dispatcherLock)
            {
                session.lockMessageDelivery();
                try
                {
                    doWithAllLocks(r, sessions);
                }
                finally
                {
                    session.unlockMessageDelivery();
                }
            }
        }
        else
        {
            synchronized (getFailoverMutex())
            {
                r.run();
            }
        }
    }


    public String getTemporaryQueuePrefix()
    {
        if(_delegate.isVirtualHostPropertiesSupported())
        {
            final String prefix = getVirtualHostProperty("virtualHost.temporaryQueuePrefix");
            return prefix == null ? "" : prefix;
        }
        else
        {
            return "";
        }

    }

    String getVirtualHostProperty(final String propertyName)
    {
        return _virtualHostProperties.get(propertyName);
    }

    public void setConnectionSettings(final ConnectionSettings connectionSettings)
    {
        _connectionSettings = connectionSettings;
    }

    public ConnectionSettings getConnectionSettings()
    {
        return _connectionSettings;
    }

    @Override
    public boolean isTrusted(Class<?> clazz)
    {
        while (clazz.isArray())
        {
            clazz = clazz.getComponentType();
        }

        if (clazz.isPrimitive())
        {
            return true;
        }

        while (clazz != null && (clazz.isAnonymousClass() || clazz.isLocalClass()))
        {
            clazz = clazz.getEnclosingClass();
        }

        if (clazz == null || clazz.getCanonicalName() == null)
        {
            return false;
        }

        String className = clazz.getCanonicalName();

        for (String blackListedClassHierarchy : _blackListedClassHierarchies)
        {
            if ("*".equals(blackListedClassHierarchy))
            {
                return false;
            }
            else if (className != null && (className.equals(blackListedClassHierarchy) || className.startsWith(blackListedClassHierarchy + ".")))
            {
                return false;
            }
        }

        for (String whiteListedClassHierarchy : _whiteListedClassHierarchies)
        {
            if ("*".equals(whiteListedClassHierarchy))
            {
                return true;
            }
            else if (className != null && (className.equals(whiteListedClassHierarchy) || className.startsWith(whiteListedClassHierarchy + ".")))
            {
                return true;
            }
        }
        return false;
    }

    boolean isVirtualHostPropertiesSupported()
    {
        return getDelegate().isVirtualHostPropertiesSupported();
    }
}
