/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "ActiveMQConnection.h"

#include <cms/Session.h>

#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQConnectionMetaData.h>
#include <activemq/core/ActiveMQMessageAudit.h>
#include <activemq/core/ActiveMQDestinationSource.h>
#include <activemq/core/AdvisoryConsumer.h>
#include <activemq/core/ConnectionAudit.h>
#include <activemq/core/kernels/ActiveMQSessionKernel.h>
#include <activemq/core/kernels/ActiveMQProducerKernel.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/exceptions/BrokerException.h>
#include <activemq/exceptions/ConnectionFailedException.h>
#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/util/IdGenerator.h>
#include <activemq/transport/failover/FailoverTransport.h>
#include <activemq/transport/ResponseCallback.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/wireformat/openwire/OpenWireFormat.h>

#include <decaf/lang/Math.h>
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Integer.h>
#include <decaf/util/Iterator.h>
#include <decaf/util/Set.h>
#include <decaf/util/Collection.h>
#include <decaf/util/LinkedList.h>
#include <decaf/util/UUID.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/concurrent/ThreadPoolExecutor.h>
#include <decaf/util/concurrent/LinkedBlockingQueue.h>
#include <decaf/util/concurrent/locks/ReentrantReadWriteLock.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>

#include <activemq/commands/Command.h>
#include <activemq/commands/ActiveMQMessage.h>
#include <activemq/commands/BrokerInfo.h>
#include <activemq/commands/BrokerError.h>
#include <activemq/commands/ConnectionId.h>
#include <activemq/commands/DestinationInfo.h>
#include <activemq/commands/ExceptionResponse.h>
#include <activemq/commands/Message.h>
#include <activemq/commands/MessagePull.h>
#include <activemq/commands/MessageAck.h>
#include <activemq/commands/MessageDispatch.h>
#include <activemq/commands/ProducerAck.h>
#include <activemq/commands/ProducerInfo.h>
#include <activemq/commands/RemoveInfo.h>
#include <activemq/commands/ShutdownInfo.h>
#include <activemq/commands/SessionInfo.h>
#include <activemq/commands/WireFormatInfo.h>

using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::core::kernels;
using namespace activemq::core::policies;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace activemq::threads;
using namespace activemq::transport;
using namespace activemq::transport::failover;
using namespace activemq::wireformat::openwire;
using namespace decaf;
using namespace decaf::io;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;

////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace core {

    class ConnectionThreadFactory : public ThreadFactory {
    private:

        std::string connectionId;

    public:

        ConnectionThreadFactory(std::string connectionId) : connectionId(connectionId) {
            if (connectionId.empty()) {
                throw NullPointerException(__FILE__, __LINE__, "Connection Id must be set.");
            }
        }

        virtual ~ConnectionThreadFactory() {}

        virtual Thread* newThread(decaf::lang::Runnable* runnable) {
            static std::string prefix = "ActiveMQ Connection Executor: ";

            std::string name = prefix + connectionId;
            Thread* thread = new Thread(runnable, name);
            return thread;
        }

    };

    class ConnectionConfig {
    private:

        ConnectionConfig(const ConnectionConfig&);
        ConnectionConfig& operator=(const ConnectionConfig&);

    public:

        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
                                     Dispatcher*,
                                     commands::ConsumerId::COMPARATOR > DispatcherMap;

        typedef decaf::util::StlMap< Pointer<commands::ProducerId>,
                                     Pointer<ActiveMQProducerKernel>,
                                     commands::ProducerId::COMPARATOR > ProducerMap;

        typedef decaf::util::concurrent::ConcurrentStlMap< Pointer<commands::ActiveMQTempDestination>,
                                                           Pointer<commands::ActiveMQTempDestination>,
                                                           commands::ActiveMQTempDestination::COMPARATOR > TempDestinationMap;

    public:

        static util::IdGenerator CONNECTION_ID_GENERATOR;
        static DefaultTransportListener DO_NOTHING_TRANSPORT_LISTENER;

        Pointer<decaf::util::Properties> properties;
        Pointer<transport::Transport> transport;
        Pointer<util::IdGenerator> clientIdGenerator;
        Pointer<Scheduler> scheduler;
        Pointer<ExecutorService> executor;

        util::LongSequenceGenerator sessionIds;
        util::LongSequenceGenerator consumerIdGenerator;
        util::LongSequenceGenerator tempDestinationIds;
        util::LongSequenceGenerator localTransactionIds;

        std::string brokerURL;

        bool clientIDSet;
        bool isConnectionInfoSentToBroker;
        bool userSpecifiedClientID;

        decaf::util::concurrent::Mutex ensureConnectionInfoSentMutex;
        decaf::util::concurrent::Mutex onExceptionLock;
        decaf::util::concurrent::Mutex mutex;

        bool dispatchAsync;
        bool alwaysSyncSend;
        bool useAsyncSend;
        bool sendAcksAsync;
        bool messagePrioritySupported;
        bool watchTopicAdvisories;
        bool useCompression;
        bool useRetroactiveConsumer;
        bool checkForDuplicates;
        bool optimizeAcknowledge;
        bool exclusiveConsumer;
        bool transactedIndividualAck;
        bool nonBlockingRedelivery;
        bool alwaysSessionAsync;
        int compressionLevel;
        unsigned int sendTimeout;
        unsigned int closeTimeout;
        unsigned int producerWindowSize;
        int auditDepth;
        int auditMaximumProducerNumber;
        long long optimizeAcknowledgeTimeOut;
        long long optimizedAckScheduledAckInterval;
        long long consumerFailoverRedeliveryWaitPeriod;
        bool consumerExpiryCheckEnabled;

        std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
        std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;

        cms::ExceptionListener* exceptionListener;
        cms::MessageTransformer* transformer;

        Pointer<commands::ConnectionInfo> connectionInfo;
        Pointer<commands::BrokerInfo> brokerInfo;
        Pointer<commands::WireFormatInfo> brokerWireFormatInfo;
        Pointer<AtomicInteger> transportInterruptionProcessingComplete;
        Pointer<AtomicInteger> protocolVersion;
        Pointer<CountDownLatch> brokerInfoReceived;
        Pointer<AdvisoryConsumer> advisoryConsumer;

        Pointer<Exception> firstFailureError;

        DispatcherMap dispatchers;
        ProducerMap activeProducers;

        decaf::util::concurrent::locks::ReentrantReadWriteLock sessionsLock;
        decaf::util::LinkedList< Pointer<ActiveMQSessionKernel> > activeSessions;
        decaf::util::LinkedList<transport::TransportListener*> transportListeners;

        TempDestinationMap activeTempDestinations;

        ConnectionAudit connectionAudit;

        ConnectionConfig(const Pointer<transport::Transport> transport,
                         const Pointer<decaf::util::Properties> properties) :
                             properties(properties),
                             transport(transport),
                             clientIdGenerator(),
                             scheduler(),
                             executor(),
                             sessionIds(),
                             consumerIdGenerator(),
                             tempDestinationIds(),
                             localTransactionIds(),
                             brokerURL(""),
                             clientIDSet(false),
                             isConnectionInfoSentToBroker(false),
                             userSpecifiedClientID(false),
                             ensureConnectionInfoSentMutex(),
                             onExceptionLock(),
                             mutex(),
                             dispatchAsync(true),
                             alwaysSyncSend(false),
                             useAsyncSend(false),
                             sendAcksAsync(true),
                             messagePrioritySupported(true),
                             watchTopicAdvisories(true),
                             useCompression(false),
                             useRetroactiveConsumer(false),
                             checkForDuplicates(true),
                             optimizeAcknowledge(false),
                             exclusiveConsumer(false),
                             transactedIndividualAck(false),
                             nonBlockingRedelivery(false),
                             alwaysSessionAsync(true),
                             compressionLevel(-1),
                             sendTimeout(0),
                             closeTimeout(15000),
                             producerWindowSize(0),
                             auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
                             auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT),
                             optimizeAcknowledgeTimeOut(300),
                             optimizedAckScheduledAckInterval(0),
                             consumerFailoverRedeliveryWaitPeriod(0),
                             consumerExpiryCheckEnabled(true),
                             defaultPrefetchPolicy(NULL),
                             defaultRedeliveryPolicy(NULL),
                             exceptionListener(NULL),
                             transformer(NULL),
                             connectionInfo(),
                             brokerInfo(),
                             brokerWireFormatInfo(),
                             transportInterruptionProcessingComplete(),
                             brokerInfoReceived(),
                             advisoryConsumer(),
                             firstFailureError(),
                             dispatchers(),
                             activeProducers(),
                             sessionsLock(),
                             activeSessions(),
                             transportListeners(),
                             activeTempDestinations() {

            this->defaultPrefetchPolicy.reset(new DefaultPrefetchPolicy());
            this->defaultRedeliveryPolicy.reset(new DefaultRedeliveryPolicy());
            this->clientIdGenerator.reset(new util::IdGenerator);
            this->connectionInfo.reset(new ConnectionInfo());
            this->brokerInfoReceived.reset(new CountDownLatch(1));

            // Generate a connectionId
            std::string uniqueId = CONNECTION_ID_GENERATOR.generateId();
            decaf::lang::Pointer<ConnectionId> connectionId(new ConnectionId());
            connectionId->setValue(uniqueId);

            this->transportInterruptionProcessingComplete.reset(new AtomicInteger());
            this->protocolVersion.reset(new AtomicInteger(OpenWireFormat::MAX_SUPPORTED_VERSION));
            this->executor.reset(
                new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS,
                    new LinkedBlockingQueue<Runnable*>(),
                    new ConnectionThreadFactory(connectionId->toString())));

            this->connectionInfo->setConnectionId(connectionId);
            this->scheduler.reset(new Scheduler(std::string("ActiveMQConnection[")+uniqueId+"] Scheduler"));
            this->scheduler->start();
        }

        ~ConnectionConfig() {
            try {
                synchronized(&onExceptionLock) {
                    this->scheduler->shutdown();
                    this->executor->shutdown();
                    this->executor->awaitTermination(10, TimeUnit::MINUTES);
                }
            }
            AMQ_CATCHALL_NOTHROW()
        }

        void waitForBrokerInfo() {
            this->brokerInfoReceived->await();
        }
    };

    // Static init.
    util::IdGenerator ConnectionConfig::CONNECTION_ID_GENERATOR;
    DefaultTransportListener ConnectionConfig::DO_NOTHING_TRANSPORT_LISTENER;

    class ConnectionErrorRunnable : public Runnable {
    private:

        ActiveMQConnection* connection;
        Pointer<ConnectionError> error;

    private:

        ConnectionErrorRunnable(const ConnectionErrorRunnable&);
        ConnectionErrorRunnable& operator= (const ConnectionErrorRunnable&);

    public:

        ConnectionErrorRunnable(ActiveMQConnection* connection, Pointer<ConnectionError> error) :
            Runnable(), connection(connection), error(error) {}
        virtual ~ConnectionErrorRunnable() {}

        virtual void run() {
            try {
                if (error != NULL && error->getException() != NULL) {
                    this->connection->onAsyncException(error->getException()->createExceptionObject());
                }
            } catch(Exception& ex) {}
        }
    };

    class OnAsyncExceptionRunnable : public Runnable {
    private:

        ActiveMQConnection* connection;
        Exception ex;

    private:

        OnAsyncExceptionRunnable(const OnAsyncExceptionRunnable&);
        OnAsyncExceptionRunnable& operator= (const OnAsyncExceptionRunnable&);

    public:

        OnAsyncExceptionRunnable(ActiveMQConnection* connection, const Exception& ex) :
            Runnable(), connection(connection), ex(ex) {}
        virtual ~OnAsyncExceptionRunnable() {}

        virtual void run() {
            try {
                cms::ExceptionListener* listener = this->connection->getExceptionListener();
                if (listener != NULL) {

                    const cms::CMSException* cause = dynamic_cast<const cms::CMSException*>(ex.getCause());
                    if (cause != NULL) {
                        listener->onException(*cause);
                    } else {
                        ActiveMQException amqEx(ex);
                        listener->onException(amqEx.convertToCMSException());
                    }
                }
            } catch(Exception& ex) {}
        }
    };

    class OnExceptionRunnable : public Runnable {
    private:

        ActiveMQConnection* connection;
        ConnectionConfig* config;
        Pointer<Exception> ex;

    private:

        OnExceptionRunnable(const OnExceptionRunnable&);
        OnExceptionRunnable& operator= (const OnExceptionRunnable&);

    public:

        OnExceptionRunnable(ActiveMQConnection* connection, ConnectionConfig* config, Exception* ex) :
            Runnable(), connection(connection), config(config), ex(ex) {}
        virtual ~OnExceptionRunnable() {}

        virtual void run() {
            try {

                // Take control of this pointer, it will be given to the Connection who
                // will destroy it when it closes.
                Exception* error = ex.release();

                // Mark this Connection as having a Failed transport.
                this->connection->setFirstFailureError(error);

                Pointer<Transport> transport = this->config->transport;
                if (transport != NULL) {
                    try {
                        transport->stop();
                    } catch(...) {
                    }
                }

                this->config->brokerInfoReceived->countDown();

                // Clean up the Connection resources.
                this->connection->cleanup();

                synchronized(&this->config->transportListeners) {
                    Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() );

                    while (iter->hasNext()) {
                        try {
                            iter->next()->onException(*error);
                        } catch(...) {}
                    }
                }
            } catch(Exception& ex) {}
        }
    };

    class AsyncResponseCallback : public ResponseCallback {
    private:

        ConnectionConfig* config;
        cms::AsyncCallback* callback;

    private:

        AsyncResponseCallback(const AsyncResponseCallback&);
        AsyncResponseCallback& operator= (const AsyncResponseCallback&);

    public:

        AsyncResponseCallback(ConnectionConfig* config, cms::AsyncCallback* callback) :
            ResponseCallback(), config(config), callback(callback) {
        }

        virtual ~AsyncResponseCallback() {
        }

        virtual void onComplete(Pointer<commands::Response> response) {

            commands::ExceptionResponse* exceptionResponse =
                dynamic_cast<ExceptionResponse*> (response.get());

            if (exceptionResponse != NULL) {

                Exception ex = exceptionResponse->getException()->createExceptionObject();
                const cms::CMSException* cmsError = dynamic_cast<const cms::CMSException*>(ex.getCause());
                if (cmsError != NULL) {
                    this->callback->onException(*cmsError);
                } else {
                    BrokerException error = BrokerException(__FILE__, __LINE__, exceptionResponse->getException()->getMessage().c_str());
                    this->callback->onException(error.convertToCMSException());
                }
            } else {
                this->callback->onSuccess();
            }
        }
    };

}}

////////////////////////////////////////////////////////////////////////////////
ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport> transport,
                                       const Pointer<decaf::util::Properties> properties) :
    config(NULL), connectionMetaData(new ActiveMQConnectionMetaData()), started(false),
    closed(false), closing(false), transportFailed(false) {

    Pointer<ConnectionConfig> configuration(
            new ConnectionConfig(transport, properties));

    // Register for messages and exceptions from the connector.
    transport->setTransportListener(this);

    // Set the initial state of the ConnectionInfo
    configuration->connectionInfo->setManageable(true);
    configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());

    configuration->connectionAudit.setCheckForDuplicates(transport->isFaultTolerant());

    this->config = configuration.release();
}

////////////////////////////////////////////////////////////////////////////////
ActiveMQConnection::~ActiveMQConnection() {

    try {
        this->close();
    }
    AMQ_CATCHALL_NOTHROW()

    try {
        // This must happen even if exceptions occur in the Close attempt.
        delete this->config;
    }
    AMQ_CATCHALL_NOTHROW()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::addDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher) {

    try {
        synchronized(&this->config->dispatchers) {
            this->config->dispatchers.put(consumer, dispatcher);
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::removeDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer) {

    try {
        synchronized(&this->config->dispatchers) {
            this->config->dispatchers.remove(consumer);
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
cms::Session* ActiveMQConnection::createSession() {
    try {
        return createSession(Session::AUTO_ACKNOWLEDGE);
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
cms::Session* ActiveMQConnection::createSession(cms::Session::AcknowledgeMode ackMode) {

    try {

        checkClosedOrFailed();
        ensureConnectionInfoSent();

        // Create the session instance as a Session Kernel we then create and return a
        // ActiveMQSession instance that acts as a proxy to the kernel caller can delete
        // that at any time since we only refer to the Pointer to the session kernel.
        Pointer<ActiveMQSessionKernel> session(
            new ActiveMQSessionKernel(this, getNextSessionId(), ackMode, *this->config->properties));

        session->setMessageTransformer(this->config->transformer);

        this->addSession(session);

        return new ActiveMQSession(session);
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
Pointer<SessionId> ActiveMQConnection::getNextSessionId() {

    decaf::lang::Pointer<SessionId> sessionId(new SessionId());
    sessionId->setConnectionId(this->config->connectionInfo->getConnectionId()->getValue());
    sessionId->setValue(this->config->sessionIds.getNextSequenceId());

    return sessionId;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::addSession(Pointer<ActiveMQSessionKernel> session) {
    try {
        this->config->sessionsLock.writeLock().lock();
        try {
            this->config->activeSessions.add(session);
            this->config->sessionsLock.writeLock().unlock();
        } catch (Exception& ex) {
            this->config->sessionsLock.writeLock().unlock();
            throw;
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::removeSession(Pointer<ActiveMQSessionKernel> session) {
    try {
        this->config->sessionsLock.writeLock().lock();
        try {
            this->config->activeSessions.remove(session);
            this->config->connectionAudit.removeDispatcher(session.get());
            this->config->sessionsLock.writeLock().unlock();
        } catch (Exception& ex) {
            this->config->sessionsLock.writeLock().unlock();
            throw;
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::addProducer(Pointer<ActiveMQProducerKernel> producer) {

    try {
        synchronized(&this->config->activeProducers) {
            this->config->activeProducers.put(producer->getProducerInfo()->getProducerId(), producer);
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::removeProducer(const decaf::lang::Pointer<ProducerId>& producerId) {

    try {
        synchronized(&this->config->activeProducers) {
            this->config->activeProducers.remove(producerId);
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
std::string ActiveMQConnection::getClientID() const {

    if (this->isClosed()) {
        return "";
    }

    return this->config->connectionInfo->getClientId();
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setClientID(const std::string& clientID) {

    if (this->closed.get()) {
        throw cms::IllegalStateException("Connection is already closed", NULL);
    }

    if (this->config->clientIDSet) {
        throw cms::IllegalStateException("Client ID is already set", NULL);
    }

    if (this->config->isConnectionInfoSentToBroker) {
        throw cms::IllegalStateException("Cannot set client Id on a Connection already in use.", NULL);
    }

    if (clientID.empty()) {
        throw cms::InvalidClientIdException("Client ID cannot be an empty string", NULL);
    }

    this->config->connectionInfo->setClientId(clientID);
    this->config->userSpecifiedClientID = true;

    try {
        ensureConnectionInfoSent();
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setDefaultClientId(const std::string& clientId) {
    this->setClientID(clientId);
    this->config->userSpecifiedClientID = true;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::close() {

    try {

        if (this->isClosed()) {
            return;
        }

        Exception ex;
        bool hasException = false;

        // If we are running lets stop first.
        if (!this->transportFailed.get()) {
            try {
                this->stop();
            } catch (cms::CMSException& error) {
                if (!hasException) {
                    ex = ActiveMQException(error.clone());
                    hasException = true;
                }
            }
        }

        // Indicates we are on the way out to suppress any exceptions getting
        // passed on from the transport as it goes down.
        this->closing.set(true);

        if (this->config->scheduler != NULL) {
            try {
                this->config->scheduler->stop();
            } catch (Exception& error) {
                if (!hasException) {
                    ex = error;
                    ex.setMark(__FILE__, __LINE__);
                    hasException = true;
                }
            }
        }

        long long lastDeliveredSequenceId = 0;

        // Get the complete list of active sessions.
        try {
            this->config->sessionsLock.writeLock().lock();

            // We need to use a copy since we aren't able to use CopyOnWriteArrayList
            ArrayList<Pointer<ActiveMQSessionKernel> > sessions(this->config->activeSessions);
            std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator());

            // Dispose of all the Session resources we know are still open.
            while (iter->hasNext()) {
                Pointer<ActiveMQSessionKernel> session = iter->next();
                try {
                    session->dispose();
                    lastDeliveredSequenceId = Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
                } catch (cms::CMSException& ex) {
                }
            }

            this->config->activeSessions.clear();
            this->config->sessionsLock.writeLock().unlock();
        } catch (Exception& error) {
            this->config->sessionsLock.writeLock().unlock();
            if (!hasException) {
                ex = error;
                ex.setMark(__FILE__, __LINE__);
                hasException = true;
            }
        }

        // As TemporaryQueue and TemporaryTopic instances are bound to a connection
        // we should just delete them after the connection is closed to free up memory
        if (this->config->advisoryConsumer != NULL) {
            this->config->advisoryConsumer->dispose();
        }

        ArrayList<Pointer<ActiveMQTempDestination> > tempDests(this->config->activeTempDestinations.values());
        Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(tempDests.iterator());

        try {
            while (iterator->hasNext()) {
                Pointer<ActiveMQTempDestination> dest = iterator->next();
                dest->close();
            }
        } catch (cms::CMSException& error) {
            if (!hasException) {
                ex = ActiveMQException(error.clone());
                hasException = true;
            }
        }

        try {
            if (this->config->executor != NULL) {
                this->config->executor->shutdown();
            }
        } catch (Exception& error) {
            if (!hasException) {
                ex = error;
                ex.setMark(__FILE__, __LINE__);
                hasException = true;
            }
        }

        // Now inform the Broker we are shutting down.
        try {
            this->disconnect(lastDeliveredSequenceId);
        } catch (Exception& error) {
            if (!hasException) {
                ex = error;
                ex.setMark(__FILE__, __LINE__);
                hasException = true;
            }
        }

        // Once current deliveries are done this stops the delivery
        // of any new messages.
        this->started.set(false);
        this->closed.set(true);

        if (hasException) {
            throw ex;
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::cleanup() {

    try {

        this->config->sessionsLock.writeLock().lock();
        try {
            // We need to use a copy since we aren't able to use CopyOnWriteArrayList
            ArrayList<Pointer<ActiveMQSessionKernel> > sessions(this->config->activeSessions);
            std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator());

            // Dispose of all the Session resources we know are still open.
            while (iter->hasNext()) {
                Pointer<ActiveMQSessionKernel> session = iter->next();
                try {
                    session->dispose();
                } catch (cms::CMSException& ex) {
                    /* Absorb */
                }
            }
            this->config->activeSessions.clear();
            this->config->sessionsLock.writeLock().unlock();
        } catch (Exception& ex) {
            this->config->sessionsLock.writeLock().unlock();
            throw;
        }

        if (this->config->isConnectionInfoSentToBroker) {
            if (!transportFailed.get() && !closing.get()) {
                this->syncRequest(this->config->connectionInfo->createRemoveCommand());
            }
            this->config->isConnectionInfoSentToBroker = false;
        }

        if (this->config->userSpecifiedClientID) {
            this->config->connectionInfo->setClientId("");
            this->config->userSpecifiedClientID = false;
        }

        this->config->clientIDSet = false;
        this->started.set(false);
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::start() {

    try {

        checkClosedOrFailed();
        ensureConnectionInfoSent();

        try {
            // This starts or restarts the delivery of all incoming messages
            // messages delivered while this connection is stopped are dropped
            // and not acknowledged.
            if (this->started.compareAndSet(false, true)) {
                this->config->sessionsLock.readLock().lock();

                // Start all the sessions.
                std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
                while (iter->hasNext()) {
                    iter->next()->start();
                }

                this->config->sessionsLock.readLock().unlock();
            }
        } catch (Exception& ex) {
            this->config->sessionsLock.readLock().unlock();
            throw;
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::stop() {

    try {

        checkClosedOrFailed();

        try {
            // Once current deliveries are done this stops the delivery of any
            // new messages.
            if (this->started.compareAndSet(true, false)) {
                this->config->sessionsLock.readLock().lock();
                std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());

                while (iter->hasNext()) {
                    iter->next()->stop();
                }
                this->config->sessionsLock.readLock().unlock();
            }
        } catch (Exception& ex) {
            this->config->sessionsLock.readLock().unlock();
            throw;
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::disconnect(long long lastDeliveredSequenceId) {

    try {

        // Clear the listener, we don't care about async errors at this point.
        this->config->transport->setTransportListener(&ConnectionConfig::DO_NOTHING_TRANSPORT_LISTENER);

        // Allow the Support class to shutdown its resources, including the Transport.
        bool hasException = false;
        exceptions::ActiveMQException e;

        if (this->config->isConnectionInfoSentToBroker) {

            try {
                // Remove our ConnectionId from the Broker
                Pointer<RemoveInfo> command(this->config->connectionInfo->createRemoveCommand());
                command->setLastDeliveredSequenceId(lastDeliveredSequenceId);
                this->syncRequest(command, this->config->closeTimeout);
            } catch (exceptions::ActiveMQException& ex) {
                if (!hasException) {
                    hasException = true;
                    ex.setMark(__FILE__, __LINE__);
                    e = ex;
                }
            }

            try {
                // Send the disconnect command to the broker.
                Pointer<ShutdownInfo> shutdown(new ShutdownInfo());
                oneway(shutdown);
            } catch (exceptions::ActiveMQException& ex) {
                if (!hasException) {
                    hasException = true;
                    ex.setMark(__FILE__, __LINE__);
                    e = ex;
                }
            }
        }

        if (this->config->transport != NULL) {

            try {
                this->config->transport->close();
            } catch (exceptions::ActiveMQException& ex) {
                if (!hasException) {
                    hasException = true;
                    ex.setMark(__FILE__, __LINE__);
                    e = ex;
                }
            }
        }

        // If we encountered an exception - throw the first one we encountered.
        // This will preserve the stack trace for logging purposes.
        if (hasException) {
            throw e;
        }
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::sendPullRequest(const ConsumerInfo* consumer, long long timeout) {

    try {

        if (consumer->getPrefetchSize() == 0) {

            Pointer<MessagePull> messagePull(new MessagePull());
            messagePull->setConsumerId(consumer->getConsumerId());
            messagePull->setDestination(consumer->getDestination());
            messagePull->setTimeout(timeout);

            this->oneway(messagePull);
        }
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::destroyDestination(const ActiveMQDestination* destination) {

    try {

        if (destination == NULL) {
            throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
        }

        checkClosedOrFailed();
        ensureConnectionInfoSent();

        Pointer<DestinationInfo> command(new DestinationInfo());

        command->setConnectionId(this->config->connectionInfo->getConnectionId());
        command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
        command->setDestination(Pointer<ActiveMQDestination>(destination->cloneDataStructure()));

        // Send the message to the broker.
        syncRequest(command);
    }
    AMQ_CATCH_RETHROW(NullPointerException)
    AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::destroyDestination(const cms::Destination* destination) {

    try {

        if (destination == NULL) {
            throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
        }

        checkClosedOrFailed();
        ensureConnectionInfoSent();

        const ActiveMQDestination* amqDestination = dynamic_cast<const ActiveMQDestination*>(destination);

        this->destroyDestination(amqDestination);
    }
    AMQ_CATCH_RETHROW(NullPointerException)
    AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onCommand(const Pointer<Command> command) {

    try {

        if (command->isMessageDispatch()) {

            Pointer<MessageDispatch> dispatch = command.dynamicCast<MessageDispatch>();

            // Check first to see if we are recovering.
            waitForTransportInterruptionProcessingToComplete();

            // Look up the dispatcher.
            Dispatcher* dispatcher = NULL;
            synchronized(&this->config->dispatchers) {

                dispatcher = this->config->dispatchers.get(dispatch->getConsumerId());

                // If we have no registered dispatcher, the consumer was probably
                // just closed.
                if (dispatcher != NULL) {

                    Pointer<commands::Message> message = dispatch->getMessage();

                    // Message == NULL to signal the end of a Queue Browse.
                    if (message != NULL) {
                        message->setReadOnlyBody(true);
                        message->setReadOnlyProperties(true);
                        message->setRedeliveryCounter(dispatch->getRedeliveryCounter());
                        message->setConnection(this);
                    }

                    dispatcher->dispatch(dispatch);
                }
            }

        } else if (command->isProducerAck()) {

            ProducerAck* producerAck = dynamic_cast<ProducerAck*>(command.get());

            // Get the consumer info object for this consumer.
            Pointer<ActiveMQProducerKernel> producer;
            synchronized(&this->config->activeProducers) {
                producer = this->config->activeProducers.get(producerAck->getProducerId());
                if (producer != NULL) {
                    producer->onProducerAck(*producerAck);
                }
            }

        } else if (command->isWireFormatInfo()) {
            this->onWireFormatInfo(command);
        } else if (command->isBrokerInfo()) {
            this->config->brokerInfo = command.dynamicCast<BrokerInfo>();
            this->config->brokerInfoReceived->countDown();
        } else if (command->isConnectionControl()) {
            this->onConnectionControl(command);
        } else if (command->isControlCommand()) {
            this->onControlCommand(command);
        } else if (command->isConnectionError()) {

            Pointer<ConnectionError> connectionError = command.dynamicCast<ConnectionError>();
            this->config->executor->execute(new ConnectionErrorRunnable(this, connectionError));

        } else if (command->isConsumerControl()) {
            this->onConsumerControl(command);
        }

        synchronized(&this->config->transportListeners) {
            Pointer<Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
            while (iter->hasNext()) {
                try {
                    iter->next()->onCommand(command);
                } catch (...) {
                }
            }
        }
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onWireFormatInfo(Pointer<commands::Command> command AMQCPP_UNUSED) {
    this->config->brokerWireFormatInfo = command.dynamicCast<WireFormatInfo>();
    this->config->protocolVersion->set(this->config->brokerWireFormatInfo->getVersion());
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onControlCommand(Pointer<commands::Command> command AMQCPP_UNUSED) {
    // Don't need to do anything yet as close and shutdown are applicable yet.
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onConnectionControl(Pointer<commands::Command> command AMQCPP_UNUSED) {
    // Don't need to do anything yet as we don't support optimizeAcknowledge.
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onConsumerControl(Pointer<commands::Command> command) {

    Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>();

    this->config->sessionsLock.readLock().lock();
    try {
        // Get the complete list of active sessions.
        std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());

        while (iter->hasNext()) {
            Pointer<ActiveMQSessionKernel> session = iter->next();
            if (consumerControl->isClose()) {
                session->close(consumerControl->getConsumerId());
            } else {
                session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
            }
        }
        this->config->sessionsLock.readLock().unlock();
    } catch (Exception& ex) {
        this->config->sessionsLock.readLock().unlock();
        throw;
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onException(const decaf::lang::Exception& ex) {

    try {

        // Sync with the config destructor in case a client attempt to
        synchronized(&this->config->onExceptionLock) {
            onAsyncException(ex);

            // We're disconnected - the asynchronous error is expected.
            if (!this->isClosed() || !this->closing.get()) {
                this->config->executor->execute(new OnExceptionRunnable(this, config, ex.clone()));
            }
        }
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onAsyncException(const decaf::lang::Exception& ex) {

    if (!this->isClosed() || !this->closing.get()) {

        if (this->config->exceptionListener != NULL) {
            this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex));
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onClientInternalException(const decaf::lang::Exception& ex) {

    if (!closed.get() && !closing.get()) {

        if (this->config->exceptionListener != NULL) {
            this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex));
        }

        // TODO Turn this into an invocation on a special ClientInternalExceptionListener
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::transportInterrupted() {

    this->config->transportInterruptionProcessingComplete->set(0);

    this->config->sessionsLock.readLock().lock();
    try {
        std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
        while (sessions->hasNext()) {
            sessions->next()->clearMessagesInProgress(this->config->transportInterruptionProcessingComplete);
        }
        this->config->sessionsLock.readLock().unlock();
    } catch (Exception& ex) {
        this->config->sessionsLock.readLock().unlock();
        throw;
    }

    synchronized(&this->config->transportListeners) {
        Pointer<Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator());
        while (listeners->hasNext()) {
            try {
                listeners->next()->transportInterrupted();
            } catch (...) {
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::transportResumed() {

    synchronized(&this->config->transportListeners) {
        Pointer<Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
        while (iter->hasNext()) {
            try {
                iter->next()->transportResumed();
            } catch (...) {
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::oneway(Pointer<Command> command) {

    try {
        checkClosedOrFailed();
        this->config->transport->oneway(command);
    }
    AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
Pointer<Response> ActiveMQConnection::syncRequest(Pointer<Command> command, unsigned int timeout) {

    try {

        checkClosedOrFailed();

        Pointer<Response> response;

        if (timeout == 0) {
            response = this->config->transport->request(command);
        } else {
            response = this->config->transport->request(command, timeout);
        }

        commands::ExceptionResponse* exceptionResponse = dynamic_cast<ExceptionResponse*>(response.get());

        if (exceptionResponse != NULL) {
            throw exceptionResponse->getException()->createExceptionObject();
        }

        return response;
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::asyncRequest(Pointer<Command> command, cms::AsyncCallback* onComplete) {

    try {

        if (onComplete == NULL) {
            this->syncRequest(command);
            return;
        }

        checkClosedOrFailed();

        Pointer<ResponseCallback> callback(new AsyncResponseCallback(this->config, onComplete));
        this->config->transport->asyncRequest(command, callback);
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::checkClosed() const {
    if (this->isClosed()) {
        throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConnection::enforceConnected - Connection has already been closed!");
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::checkClosedOrFailed() const {

    checkClosed();
    if (this->transportFailed.get() == true) {
        throw ConnectionFailedException(*this->config->firstFailureError);
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::ensureConnectionInfoSent() {

    try {

        // Can we skip sending the ConnectionInfo packet, cheap test
        if (this->config->isConnectionInfoSentToBroker || closed.get()) {
            return;
        }

        synchronized(&( this->config->ensureConnectionInfoSentMutex)) {

            // Can we skip sending the ConnectionInfo packet??
            if (this->config->isConnectionInfoSentToBroker || closed.get()) {
                return;
            }

            // check for a user specified Id
            if (!this->config->userSpecifiedClientID) {
                this->config->connectionInfo->setClientId(this->config->clientIdGenerator->generateId());
            }

            // Now we ping the broker and see if we get an ack / nack
            syncRequest(this->config->connectionInfo);

            this->config->isConnectionInfoSentToBroker = true;

            Pointer<SessionId> sessionId(new SessionId(this->config->connectionInfo->getConnectionId().get(), -1));
            Pointer<ConsumerId> consumerId(new ConsumerId(*sessionId, this->config->consumerIdGenerator.getNextSequenceId()));
            if (this->config->watchTopicAdvisories) {
                this->config->advisoryConsumer.reset(new AdvisoryConsumer(this, consumerId));
            }
        }
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::fire(const ActiveMQException& ex) {
    if (this->config->exceptionListener != NULL) {
        try {
            this->config->exceptionListener->onException(ex.convertToCMSException());
        } catch (...) {
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
const ConnectionInfo& ActiveMQConnection::getConnectionInfo() const {
    checkClosed();
    return *this->config->connectionInfo;
}

////////////////////////////////////////////////////////////////////////////////
const ConnectionId& ActiveMQConnection::getConnectionId() const {
    checkClosed();
    return *(this->config->connectionInfo->getConnectionId());
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::addTransportListener(TransportListener* transportListener) {

    if (transportListener == NULL) {
        return;
    }

    // Add this listener from the set of active TransportListeners
    synchronized(&this->config->transportListeners) {
        this->config->transportListeners.add(transportListener);
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::removeTransportListener(TransportListener* transportListener) {

    if (transportListener == NULL) {
        return;
    }

    // Remove this listener from the set of active TransportListeners
    synchronized(&this->config->transportListeners) {
        this->config->transportListeners.remove(transportListener);
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::waitForTransportInterruptionProcessingToComplete() {

    while (!closed.get() && !transportFailed.get() && this->config->transportInterruptionProcessingComplete->get() > 0) {
        signalInterruptionProcessingComplete();
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setTransportInterruptionProcessingComplete() {
    if (this->config->transportInterruptionProcessingComplete->decrementAndGet() == 0) {
        signalInterruptionProcessingComplete();
    }
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::signalInterruptionProcessingComplete() {

    FailoverTransport* failoverTransport =
        dynamic_cast<FailoverTransport*>(this->config->transport->narrow(typeid(FailoverTransport)));

    if (failoverTransport != NULL) {
        failoverTransport->setConnectionInterruptProcessingComplete(
            this->config->connectionInfo->getConnectionId());
    }

    this->config->transportInterruptionProcessingComplete->set(0);
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setUsername(const std::string& username) {
    this->config->connectionInfo->setUserName(username);
}

////////////////////////////////////////////////////////////////////////////////
const std::string& ActiveMQConnection::getUsername() const {
    return this->config->connectionInfo->getUserName();
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setPassword(const std::string& password) {
    this->config->connectionInfo->setPassword(password);
}

////////////////////////////////////////////////////////////////////////////////
const std::string& ActiveMQConnection::getPassword() const {
    return this->config->connectionInfo->getPassword();
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setBrokerURL(const std::string& brokerURL) {
    this->config->brokerURL = brokerURL;
}

////////////////////////////////////////////////////////////////////////////////
const std::string& ActiveMQConnection::getBrokerURL() const {
    return this->config->brokerURL;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setExceptionListener(cms::ExceptionListener* listener) {
    this->config->exceptionListener = listener;
}

////////////////////////////////////////////////////////////////////////////////
cms::ExceptionListener* ActiveMQConnection::getExceptionListener() const {
    return this->config->exceptionListener;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setMessageTransformer(cms::MessageTransformer* transformer) {
    this->config->transformer = transformer;
}

////////////////////////////////////////////////////////////////////////////////
cms::MessageTransformer* ActiveMQConnection::getMessageTransformer() const {
    return this->config->transformer;
}

////////////////////////////////////////////////////////////////////////////////
cms::DestinationSource* ActiveMQConnection::getDestinationSource() {
    return new ActiveMQDestinationSource(this);
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setPrefetchPolicy(PrefetchPolicy* policy) {
    this->config->defaultPrefetchPolicy.reset(policy);
}

////////////////////////////////////////////////////////////////////////////////
PrefetchPolicy* ActiveMQConnection::getPrefetchPolicy() const {
    return this->config->defaultPrefetchPolicy.get();
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setRedeliveryPolicy(RedeliveryPolicy* policy) {
    this->config->defaultRedeliveryPolicy.reset(policy);
}

////////////////////////////////////////////////////////////////////////////////
RedeliveryPolicy* ActiveMQConnection::getRedeliveryPolicy() const {
    return this->config->defaultRedeliveryPolicy.get();
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isDispatchAsync() const {
    return this->config->dispatchAsync;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setDispatchAsync(bool value) {
    this->config->dispatchAsync = value;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isAlwaysSyncSend() const {
    return this->config->alwaysSyncSend;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setAlwaysSyncSend(bool value) {
    this->config->alwaysSyncSend = value;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isUseAsyncSend() const {
    return this->config->useAsyncSend;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setUseAsyncSend(bool value) {
    this->config->useAsyncSend = value;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isUseCompression() const {
    return this->config->useCompression;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setUseCompression(bool value) {
    this->config->useCompression = value;
}

////////////////////////////////////////////////////////////////////////////////
int ActiveMQConnection::getCompressionLevel() const {
    return this->config->compressionLevel;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setCompressionLevel(int value) {

    if (value < 0) {
        this->config->compressionLevel = -1;
    }

    this->config->compressionLevel = Math::min(value, 9);
}

////////////////////////////////////////////////////////////////////////////////
unsigned int ActiveMQConnection::getSendTimeout() const {
    return this->config->sendTimeout;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setSendTimeout(unsigned int timeout) {
    this->config->sendTimeout = timeout;
}

////////////////////////////////////////////////////////////////////////////////
unsigned int ActiveMQConnection::getCloseTimeout() const {
    return this->config->closeTimeout;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setCloseTimeout(unsigned int timeout) {
    this->config->closeTimeout = timeout;
}

////////////////////////////////////////////////////////////////////////////////
unsigned int ActiveMQConnection::getProducerWindowSize() const {
    return this->config->producerWindowSize;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setProducerWindowSize(unsigned int windowSize) {
    this->config->producerWindowSize = windowSize;
}

////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConnection::getNextTempDestinationId() {
    return this->config->tempDestinationIds.getNextSequenceId();
}

////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConnection::getNextLocalTransactionId() {
    return this->config->localTransactionIds.getNextSequenceId();
}

////////////////////////////////////////////////////////////////////////////////
transport::Transport& ActiveMQConnection::getTransport() const {
    return *(this->config->transport);
}

////////////////////////////////////////////////////////////////////////////////
Pointer<Scheduler> ActiveMQConnection::getScheduler() const {
    return this->config->scheduler;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isMessagePrioritySupported() const {
    return this->config->messagePrioritySupported;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setMessagePrioritySupported(bool value) {
    this->config->messagePrioritySupported = value;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setFirstFailureError(decaf::lang::Exception* error) {

    this->transportFailed.set(true);

    if (this->config->firstFailureError == NULL) {
        this->config->firstFailureError.reset(error);
    } else {
        delete error;
    }
}

////////////////////////////////////////////////////////////////////////////////
decaf::lang::Exception* ActiveMQConnection::getFirstFailureError() const {
    return this->config->firstFailureError.get();
}

////////////////////////////////////////////////////////////////////////////////
std::string ActiveMQConnection::getResourceManagerId() const {
    try {
        this->config->waitForBrokerInfo();

        if (this->config->brokerInfo == NULL) {
            throw CMSException("Connection failed before Broker info was received.");
        }

        return this->config->brokerInfo->getBrokerId()->getValue();
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}

////////////////////////////////////////////////////////////////////////////////
const decaf::util::Properties& ActiveMQConnection::getProperties() const {
    return *(this->config->properties);
}

////////////////////////////////////////////////////////////////////////////////
ExecutorService* ActiveMQConnection::getExecutor() const {
    return this->config->executor.get();
}

////////////////////////////////////////////////////////////////////////////////
ArrayList< Pointer<ActiveMQSessionKernel> > ActiveMQConnection::getSessions() const {
    ArrayList< Pointer<ActiveMQSessionKernel> > result;

    this->config->sessionsLock.readLock().lock();
    try {
        result.addAll(this->config->activeSessions);
        this->config->sessionsLock.readLock().unlock();
    } catch (Exception& ex) {
        this->config->sessionsLock.readLock().unlock();
        throw;
    }

    return result;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isWatchTopicAdvisories() const {
    return this->config->watchTopicAdvisories;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setWatchTopicAdvisories(bool value) {
    this->config->watchTopicAdvisories = value;
}

////////////////////////////////////////////////////////////////////////////////
int ActiveMQConnection::getAuditDepth() const {
    return this->config->auditDepth;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setAuditDepth(int auditDepth) {
    this->config->auditDepth = auditDepth;
}

////////////////////////////////////////////////////////////////////////////////
int ActiveMQConnection::getAuditMaximumProducerNumber() const {
    return this->config->auditMaximumProducerNumber;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
    this->config->auditMaximumProducerNumber = auditMaximumProducerNumber;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isCheckForDuplicates() const {
    return this->config->checkForDuplicates;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setCheckForDuplicates(bool checkForDuplicates) {
    this->config->checkForDuplicates = checkForDuplicates;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isSendAcksAsync() const {
    return this->config->sendAcksAsync;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setSendAcksAsync(bool sendAcksAsync) {
    this->config->sendAcksAsync = sendAcksAsync;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isTransactedIndividualAck() const {
    return this->config->transactedIndividualAck;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setTransactedIndividualAck(bool transactedIndividualAck) {
    this->config->transactedIndividualAck = transactedIndividualAck;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isNonBlockingRedelivery() const {
    return this->config->nonBlockingRedelivery;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setNonBlockingRedelivery(bool nonBlockingRedelivery) {
    this->config->nonBlockingRedelivery = nonBlockingRedelivery;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isOptimizeAcknowledge() const {
    return this->config->optimizeAcknowledge;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setOptimizeAcknowledge(bool optimizeAcknowledge) {
    this->config->optimizeAcknowledge = optimizeAcknowledge;
}

////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConnection::getOptimizeAcknowledgeTimeOut() const {
    return this->config->optimizeAcknowledgeTimeOut;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut) {
    this->config->optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
}

////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConnection::getOptimizedAckScheduledAckInterval() const {
    return this->config->optimizedAckScheduledAckInterval;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval) {
    this->config->optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
}

////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConnection::getConsumerFailoverRedeliveryWaitPeriod() const {
    return this->config->consumerFailoverRedeliveryWaitPeriod;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setConsumerFailoverRedeliveryWaitPeriod(long long value) {
    this->config->consumerFailoverRedeliveryWaitPeriod = value;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isUseRetroactiveConsumer() const {
    return this->config->useRetroactiveConsumer;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setUseRetroactiveConsumer(bool useRetroactiveConsumer) {
    this->config->useRetroactiveConsumer = useRetroactiveConsumer;
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isExclusiveConsumer() const {
    return this->config->exclusiveConsumer;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setExclusiveConsumer(bool exclusiveConsumer) {
    this->config->exclusiveConsumer = exclusiveConsumer;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::addTempDestination(Pointer<ActiveMQTempDestination> destination) {
    this->config->activeTempDestinations.put(destination, destination);
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::removeTempDestination(Pointer<ActiveMQTempDestination> destination) {
    this->config->activeTempDestinations.remove(destination);
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::deleteTempDestination(Pointer<ActiveMQTempDestination> destination) {

    try {

        if (destination == NULL) {
            throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
        }

        checkClosedOrFailed();
        ensureConnectionInfoSent();

        this->config->sessionsLock.readLock().lock();
        try {
            Pointer<Iterator<Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
            while (iterator->hasNext()) {
                Pointer<ActiveMQSessionKernel> session = iterator->next();
                if (session->isInUse(destination)) {
                    this->config->sessionsLock.readLock().unlock();
                    throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination");
                }
            }
            this->config->sessionsLock.readLock().unlock();
        } catch (Exception& ex) {
            this->config->sessionsLock.readLock().unlock();
            throw;
        }

        this->config->activeTempDestinations.remove(destination);

        Pointer<DestinationInfo> command(new DestinationInfo());

        command->setConnectionId(this->config->connectionInfo->getConnectionId());
        command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
        command->setDestination(Pointer<ActiveMQDestination>(destination->cloneDataStructure()));

        // Send the message to the broker.
        syncRequest(command);
    }
    AMQ_CATCH_RETHROW(NullPointerException)
    AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::cleanUpTempDestinations() {

    if (this->config->activeTempDestinations.isEmpty()) {
        return;
    }

    ArrayList< Pointer<ActiveMQTempDestination> > tempDests(this->config->activeTempDestinations.values());
    Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(tempDests.iterator());
    while (iterator->hasNext()) {
        Pointer<ActiveMQTempDestination> dest = iterator->next();

        try {
            // Only delete this temporary destination if it was created from this connection, since the
            // advisory consumer tracks all temporary destinations there can be others in our mapping that
            // this connection did not create.
            std::string thisConnectionId =
                    this->config->connectionInfo->getConnectionId() != NULL ? this->config->connectionInfo->getConnectionId()->toString() : "";
            if (dest->getConnectionId() == thisConnectionId) {
                this->deleteTempDestination(dest);
            }
        } catch (Exception& ex) {
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isDeleted(Pointer<ActiveMQTempDestination> destination) const {

    if (this->config->advisoryConsumer == NULL) {
        return false;
    }

    return !this->config->activeTempDestinations.containsKey(destination);
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {

    if (this->config->checkForDuplicates) {
        return this->config->connectionAudit.isDuplicate(dispatcher, message);
    }

    return false;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::removeAuditedDispatcher(Dispatcher* dispatcher) {
    this->config->connectionAudit.removeDispatcher(dispatcher);
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
    this->config->connectionAudit.rollbackDuplicate(dispatcher, message);
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isAlwaysSessionAsync() const {
    return this->config->alwaysSessionAsync;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setAlwaysSessionAsync(bool alwaysSessionAsync) {
    this->config->alwaysSessionAsync = alwaysSessionAsync;
}

////////////////////////////////////////////////////////////////////////////////
int ActiveMQConnection::getProtocolVersion() const {
    return this->config->protocolVersion->get();
}

////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isConsumerExpiryCheckEnabled() {
    return this->config->consumerExpiryCheckEnabled;
}

////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
    this->config->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
}
