| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "ActiveMQConnection.h" |
| |
| #include <cms/Session.h> |
| |
| #include <activemq/core/ActiveMQSession.h> |
| #include <activemq/core/ActiveMQConstants.h> |
| #include <activemq/core/ActiveMQConnectionMetaData.h> |
| #include <activemq/core/ActiveMQMessageAudit.h> |
| #include <activemq/core/ActiveMQDestinationSource.h> |
| #include <activemq/core/AdvisoryConsumer.h> |
| #include <activemq/core/ConnectionAudit.h> |
| #include <activemq/core/kernels/ActiveMQSessionKernel.h> |
| #include <activemq/core/kernels/ActiveMQProducerKernel.h> |
| #include <activemq/core/policies/DefaultPrefetchPolicy.h> |
| #include <activemq/core/policies/DefaultRedeliveryPolicy.h> |
| #include <activemq/exceptions/ActiveMQException.h> |
| #include <activemq/exceptions/BrokerException.h> |
| #include <activemq/exceptions/ConnectionFailedException.h> |
| #include <activemq/util/CMSExceptionSupport.h> |
| #include <activemq/util/IdGenerator.h> |
| #include <activemq/transport/failover/FailoverTransport.h> |
| #include <activemq/transport/ResponseCallback.h> |
| #include <activemq/transport/DefaultTransportListener.h> |
| #include <activemq/wireformat/openwire/OpenWireFormat.h> |
| |
| #include <decaf/lang/Math.h> |
| #include <decaf/lang/Boolean.h> |
| #include <decaf/lang/Integer.h> |
| #include <decaf/util/Iterator.h> |
| #include <decaf/util/Set.h> |
| #include <decaf/util/Collection.h> |
| #include <decaf/util/LinkedList.h> |
| #include <decaf/util/UUID.h> |
| #include <decaf/util/concurrent/Mutex.h> |
| #include <decaf/util/concurrent/TimeUnit.h> |
| #include <decaf/util/concurrent/CountDownLatch.h> |
| #include <decaf/util/concurrent/ThreadPoolExecutor.h> |
| #include <decaf/util/concurrent/LinkedBlockingQueue.h> |
| #include <decaf/util/concurrent/locks/ReentrantReadWriteLock.h> |
| #include <decaf/util/concurrent/atomic/AtomicInteger.h> |
| |
| #include <activemq/commands/Command.h> |
| #include <activemq/commands/ActiveMQMessage.h> |
| #include <activemq/commands/BrokerInfo.h> |
| #include <activemq/commands/BrokerError.h> |
| #include <activemq/commands/ConnectionId.h> |
| #include <activemq/commands/DestinationInfo.h> |
| #include <activemq/commands/ExceptionResponse.h> |
| #include <activemq/commands/Message.h> |
| #include <activemq/commands/MessagePull.h> |
| #include <activemq/commands/MessageAck.h> |
| #include <activemq/commands/MessageDispatch.h> |
| #include <activemq/commands/ProducerAck.h> |
| #include <activemq/commands/ProducerInfo.h> |
| #include <activemq/commands/RemoveInfo.h> |
| #include <activemq/commands/ShutdownInfo.h> |
| #include <activemq/commands/SessionInfo.h> |
| #include <activemq/commands/WireFormatInfo.h> |
| |
| using namespace std; |
| using namespace cms; |
| using namespace activemq; |
| using namespace activemq::core; |
| using namespace activemq::core::kernels; |
| using namespace activemq::core::policies; |
| using namespace activemq::commands; |
| using namespace activemq::exceptions; |
| using namespace activemq::threads; |
| using namespace activemq::transport; |
| using namespace activemq::transport::failover; |
| using namespace activemq::wireformat::openwire; |
| using namespace decaf; |
| using namespace decaf::io; |
| using namespace decaf::util; |
| using namespace decaf::util::concurrent; |
| using namespace decaf::util::concurrent::atomic; |
| using namespace decaf::lang; |
| using namespace decaf::lang::exceptions; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| namespace activemq { |
| namespace core { |
| |
| class ConnectionThreadFactory : public ThreadFactory { |
| private: |
| |
| std::string connectionId; |
| |
| public: |
| |
| ConnectionThreadFactory(std::string connectionId) : connectionId(connectionId) { |
| if (connectionId.empty()) { |
| throw NullPointerException(__FILE__, __LINE__, "Connection Id must be set."); |
| } |
| } |
| |
| virtual ~ConnectionThreadFactory() {} |
| |
| virtual Thread* newThread(decaf::lang::Runnable* runnable) { |
| static std::string prefix = "ActiveMQ Connection Executor: "; |
| |
| std::string name = prefix + connectionId; |
| Thread* thread = new Thread(runnable, name); |
| return thread; |
| } |
| |
| }; |
| |
| class ConnectionConfig { |
| private: |
| |
| ConnectionConfig(const ConnectionConfig&); |
| ConnectionConfig& operator=(const ConnectionConfig&); |
| |
| public: |
| |
| typedef decaf::util::StlMap< Pointer<commands::ConsumerId>, |
| Dispatcher*, |
| commands::ConsumerId::COMPARATOR > DispatcherMap; |
| |
| typedef decaf::util::StlMap< Pointer<commands::ProducerId>, |
| Pointer<ActiveMQProducerKernel>, |
| commands::ProducerId::COMPARATOR > ProducerMap; |
| |
| typedef decaf::util::concurrent::ConcurrentStlMap< Pointer<commands::ActiveMQTempDestination>, |
| Pointer<commands::ActiveMQTempDestination>, |
| commands::ActiveMQTempDestination::COMPARATOR > TempDestinationMap; |
| |
| public: |
| |
| static util::IdGenerator CONNECTION_ID_GENERATOR; |
| static DefaultTransportListener DO_NOTHING_TRANSPORT_LISTENER; |
| |
| Pointer<decaf::util::Properties> properties; |
| Pointer<transport::Transport> transport; |
| Pointer<util::IdGenerator> clientIdGenerator; |
| Pointer<Scheduler> scheduler; |
| Pointer<ExecutorService> executor; |
| |
| util::LongSequenceGenerator sessionIds; |
| util::LongSequenceGenerator consumerIdGenerator; |
| util::LongSequenceGenerator tempDestinationIds; |
| util::LongSequenceGenerator localTransactionIds; |
| |
| std::string brokerURL; |
| |
| bool clientIDSet; |
| bool isConnectionInfoSentToBroker; |
| bool userSpecifiedClientID; |
| |
| decaf::util::concurrent::Mutex ensureConnectionInfoSentMutex; |
| decaf::util::concurrent::Mutex onExceptionLock; |
| decaf::util::concurrent::Mutex mutex; |
| |
| bool dispatchAsync; |
| bool alwaysSyncSend; |
| bool useAsyncSend; |
| bool sendAcksAsync; |
| bool messagePrioritySupported; |
| bool watchTopicAdvisories; |
| bool useCompression; |
| bool useRetroactiveConsumer; |
| bool checkForDuplicates; |
| bool optimizeAcknowledge; |
| bool exclusiveConsumer; |
| bool transactedIndividualAck; |
| bool nonBlockingRedelivery; |
| bool alwaysSessionAsync; |
| int compressionLevel; |
| unsigned int sendTimeout; |
| unsigned int closeTimeout; |
| unsigned int producerWindowSize; |
| int auditDepth; |
| int auditMaximumProducerNumber; |
| long long optimizeAcknowledgeTimeOut; |
| long long optimizedAckScheduledAckInterval; |
| long long consumerFailoverRedeliveryWaitPeriod; |
| bool consumerExpiryCheckEnabled; |
| |
| std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy; |
| std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy; |
| |
| cms::ExceptionListener* exceptionListener; |
| cms::MessageTransformer* transformer; |
| |
| Pointer<commands::ConnectionInfo> connectionInfo; |
| Pointer<commands::BrokerInfo> brokerInfo; |
| Pointer<commands::WireFormatInfo> brokerWireFormatInfo; |
| Pointer<AtomicInteger> transportInterruptionProcessingComplete; |
| Pointer<AtomicInteger> protocolVersion; |
| Pointer<CountDownLatch> brokerInfoReceived; |
| Pointer<AdvisoryConsumer> advisoryConsumer; |
| |
| Pointer<Exception> firstFailureError; |
| |
| DispatcherMap dispatchers; |
| ProducerMap activeProducers; |
| |
| decaf::util::concurrent::locks::ReentrantReadWriteLock sessionsLock; |
| decaf::util::LinkedList< Pointer<ActiveMQSessionKernel> > activeSessions; |
| decaf::util::LinkedList<transport::TransportListener*> transportListeners; |
| |
| TempDestinationMap activeTempDestinations; |
| |
| ConnectionAudit connectionAudit; |
| |
| ConnectionConfig(const Pointer<transport::Transport> transport, |
| const Pointer<decaf::util::Properties> properties) : |
| properties(properties), |
| transport(transport), |
| clientIdGenerator(), |
| scheduler(), |
| executor(), |
| sessionIds(), |
| consumerIdGenerator(), |
| tempDestinationIds(), |
| localTransactionIds(), |
| brokerURL(""), |
| clientIDSet(false), |
| isConnectionInfoSentToBroker(false), |
| userSpecifiedClientID(false), |
| ensureConnectionInfoSentMutex(), |
| onExceptionLock(), |
| mutex(), |
| dispatchAsync(true), |
| alwaysSyncSend(false), |
| useAsyncSend(false), |
| sendAcksAsync(true), |
| messagePrioritySupported(false), |
| watchTopicAdvisories(true), |
| useCompression(false), |
| useRetroactiveConsumer(false), |
| checkForDuplicates(true), |
| optimizeAcknowledge(false), |
| exclusiveConsumer(false), |
| transactedIndividualAck(false), |
| nonBlockingRedelivery(false), |
| alwaysSessionAsync(true), |
| compressionLevel(-1), |
| sendTimeout(0), |
| closeTimeout(15000), |
| producerWindowSize(0), |
| auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE), |
| auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT), |
| optimizeAcknowledgeTimeOut(300), |
| optimizedAckScheduledAckInterval(0), |
| consumerFailoverRedeliveryWaitPeriod(0), |
| consumerExpiryCheckEnabled(true), |
| defaultPrefetchPolicy(NULL), |
| defaultRedeliveryPolicy(NULL), |
| exceptionListener(NULL), |
| transformer(NULL), |
| connectionInfo(), |
| brokerInfo(), |
| brokerWireFormatInfo(), |
| transportInterruptionProcessingComplete(), |
| brokerInfoReceived(), |
| advisoryConsumer(), |
| firstFailureError(), |
| dispatchers(), |
| activeProducers(), |
| sessionsLock(), |
| activeSessions(), |
| transportListeners(), |
| activeTempDestinations() { |
| |
| this->defaultPrefetchPolicy.reset(new DefaultPrefetchPolicy()); |
| this->defaultRedeliveryPolicy.reset(new DefaultRedeliveryPolicy()); |
| this->clientIdGenerator.reset(new util::IdGenerator); |
| this->connectionInfo.reset(new ConnectionInfo()); |
| this->brokerInfoReceived.reset(new CountDownLatch(1)); |
| |
| // Generate a connectionId |
| std::string uniqueId = CONNECTION_ID_GENERATOR.generateId(); |
| decaf::lang::Pointer<ConnectionId> connectionId(new ConnectionId()); |
| connectionId->setValue(uniqueId); |
| |
| this->transportInterruptionProcessingComplete.reset(new AtomicInteger()); |
| this->protocolVersion.reset(new AtomicInteger(OpenWireFormat::MAX_SUPPORTED_VERSION)); |
| this->executor.reset( |
| new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS, |
| new LinkedBlockingQueue<Runnable*>(), |
| new ConnectionThreadFactory(connectionId->toString()))); |
| |
| this->connectionInfo->setConnectionId(connectionId); |
| this->scheduler.reset(new Scheduler(std::string("ActiveMQConnection[")+uniqueId+"] Scheduler")); |
| this->scheduler->start(); |
| } |
| |
| ~ConnectionConfig() { |
| try { |
| synchronized(&onExceptionLock) { |
| this->scheduler->shutdown(); |
| this->executor->shutdown(); |
| this->executor->awaitTermination(10, TimeUnit::MINUTES); |
| } |
| } |
| AMQ_CATCHALL_NOTHROW() |
| } |
| |
| void waitForBrokerInfo() { |
| this->brokerInfoReceived->await(); |
| } |
| }; |
| |
| // Static init. |
| util::IdGenerator ConnectionConfig::CONNECTION_ID_GENERATOR; |
| DefaultTransportListener ConnectionConfig::DO_NOTHING_TRANSPORT_LISTENER; |
| |
| class ConnectionErrorRunnable : public Runnable { |
| private: |
| |
| ActiveMQConnection* connection; |
| Pointer<ConnectionError> error; |
| |
| private: |
| |
| ConnectionErrorRunnable(const ConnectionErrorRunnable&); |
| ConnectionErrorRunnable& operator= (const ConnectionErrorRunnable&); |
| |
| public: |
| |
| ConnectionErrorRunnable(ActiveMQConnection* connection, Pointer<ConnectionError> error) : |
| Runnable(), connection(connection), error(error) {} |
| virtual ~ConnectionErrorRunnable() {} |
| |
| virtual void run() { |
| try { |
| if (error != NULL && error->getException() != NULL) { |
| this->connection->onAsyncException(error->getException()->createExceptionObject()); |
| } |
| } catch(Exception& ex) {} |
| } |
| }; |
| |
| class OnAsyncExceptionRunnable : public Runnable { |
| private: |
| |
| ActiveMQConnection* connection; |
| Exception ex; |
| |
| private: |
| |
| OnAsyncExceptionRunnable(const OnAsyncExceptionRunnable&); |
| OnAsyncExceptionRunnable& operator= (const OnAsyncExceptionRunnable&); |
| |
| public: |
| |
| OnAsyncExceptionRunnable(ActiveMQConnection* connection, const Exception& ex) : |
| Runnable(), connection(connection), ex(ex) {} |
| virtual ~OnAsyncExceptionRunnable() {} |
| |
| virtual void run() { |
| try { |
| cms::ExceptionListener* listener = this->connection->getExceptionListener(); |
| if (listener != NULL) { |
| |
| const cms::CMSException* cause = dynamic_cast<const cms::CMSException*>(ex.getCause()); |
| if (cause != NULL) { |
| listener->onException(*cause); |
| } else { |
| ActiveMQException amqEx(ex); |
| listener->onException(amqEx.convertToCMSException()); |
| } |
| } |
| } catch(Exception& ex) {} |
| } |
| }; |
| |
| class OnExceptionRunnable : public Runnable { |
| private: |
| |
| ActiveMQConnection* connection; |
| ConnectionConfig* config; |
| Pointer<Exception> ex; |
| |
| private: |
| |
| OnExceptionRunnable(const OnExceptionRunnable&); |
| OnExceptionRunnable& operator= (const OnExceptionRunnable&); |
| |
| public: |
| |
| OnExceptionRunnable(ActiveMQConnection* connection, ConnectionConfig* config, Exception* ex) : |
| Runnable(), connection(connection), config(config), ex(ex) {} |
| virtual ~OnExceptionRunnable() {} |
| |
| virtual void run() { |
| try { |
| |
| // Take control of this pointer, it will be given to the Connection who |
| // will destroy it when it closes. |
| Exception* error = ex.release(); |
| |
| // Mark this Connection as having a Failed transport. |
| this->connection->setFirstFailureError(error); |
| |
| Pointer<Transport> transport = this->config->transport; |
| if (transport != NULL) { |
| try { |
| transport->stop(); |
| } catch(...) { |
| } |
| } |
| |
| this->config->brokerInfoReceived->countDown(); |
| |
| // Clean up the Connection resources. |
| this->connection->cleanup(); |
| |
| synchronized(&this->config->transportListeners) { |
| Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() ); |
| |
| while (iter->hasNext()) { |
| try { |
| iter->next()->onException(*error); |
| } catch(...) {} |
| } |
| } |
| } catch(Exception& ex) {} |
| } |
| }; |
| |
| class AsyncResponseCallback : public ResponseCallback { |
| private: |
| |
| ConnectionConfig* config; |
| cms::AsyncCallback* callback; |
| |
| private: |
| |
| AsyncResponseCallback(const AsyncResponseCallback&); |
| AsyncResponseCallback& operator= (const AsyncResponseCallback&); |
| |
| public: |
| |
| AsyncResponseCallback(ConnectionConfig* config, cms::AsyncCallback* callback) : |
| ResponseCallback(), config(config), callback(callback) { |
| } |
| |
| virtual ~AsyncResponseCallback() { |
| } |
| |
| virtual void onComplete(Pointer<commands::Response> response) { |
| |
| commands::ExceptionResponse* exceptionResponse = |
| dynamic_cast<ExceptionResponse*> (response.get()); |
| |
| if (exceptionResponse != NULL) { |
| |
| Exception ex = exceptionResponse->getException()->createExceptionObject(); |
| const cms::CMSException* cmsError = dynamic_cast<const cms::CMSException*>(ex.getCause()); |
| if (cmsError != NULL) { |
| this->callback->onException(*cmsError); |
| } else { |
| BrokerException error = BrokerException(__FILE__, __LINE__, exceptionResponse->getException()->getMessage().c_str()); |
| this->callback->onException(error.convertToCMSException()); |
| } |
| } else { |
| this->callback->onSuccess(); |
| } |
| } |
| }; |
| |
| }} |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport> transport, |
| const Pointer<decaf::util::Properties> properties) : |
| config(NULL), connectionMetaData(new ActiveMQConnectionMetaData()), started(false), |
| closed(false), closing(false), transportFailed(false) { |
| |
| Pointer<ConnectionConfig> configuration( |
| new ConnectionConfig(transport, properties)); |
| |
| // Register for messages and exceptions from the connector. |
| transport->setTransportListener(this); |
| |
| // Set the initial state of the ConnectionInfo |
| configuration->connectionInfo->setManageable(true); |
| configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant()); |
| |
| configuration->connectionAudit.setCheckForDuplicates(transport->isFaultTolerant()); |
| |
| this->config = configuration.release(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnection::~ActiveMQConnection() { |
| |
| try { |
| this->close(); |
| } |
| AMQ_CATCHALL_NOTHROW() |
| |
| try { |
| // This must happen even if exceptions occur in the Close attempt. |
| delete this->config; |
| } |
| AMQ_CATCHALL_NOTHROW() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::addDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher) { |
| |
| try { |
| synchronized(&this->config->dispatchers) { |
| this->config->dispatchers.put(consumer, dispatcher); |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::removeDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer) { |
| |
| try { |
| synchronized(&this->config->dispatchers) { |
| this->config->dispatchers.remove(consumer); |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Session* ActiveMQConnection::createSession() { |
| try { |
| return createSession(Session::AUTO_ACKNOWLEDGE); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Session* ActiveMQConnection::createSession(cms::Session::AcknowledgeMode ackMode) { |
| |
| try { |
| |
| checkClosedOrFailed(); |
| ensureConnectionInfoSent(); |
| |
| // Create the session instance as a Session Kernel we then create and return a |
| // ActiveMQSession instance that acts as a proxy to the kernel caller can delete |
| // that at any time since we only refer to the Pointer to the session kernel. |
| Pointer<ActiveMQSessionKernel> session( |
| new ActiveMQSessionKernel(this, getNextSessionId(), ackMode, *this->config->properties)); |
| |
| session->setMessageTransformer(this->config->transformer); |
| |
| this->addSession(session); |
| |
| return new ActiveMQSession(session); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<SessionId> ActiveMQConnection::getNextSessionId() { |
| |
| decaf::lang::Pointer<SessionId> sessionId(new SessionId()); |
| sessionId->setConnectionId(this->config->connectionInfo->getConnectionId()->getValue()); |
| sessionId->setValue(this->config->sessionIds.getNextSequenceId()); |
| |
| return sessionId; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::addSession(Pointer<ActiveMQSessionKernel> session) { |
| try { |
| this->config->sessionsLock.writeLock().lock(); |
| try { |
| this->config->activeSessions.add(session); |
| this->config->sessionsLock.writeLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->sessionsLock.writeLock().unlock(); |
| throw; |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::removeSession(Pointer<ActiveMQSessionKernel> session) { |
| try { |
| this->config->sessionsLock.writeLock().lock(); |
| try { |
| this->config->activeSessions.remove(session); |
| this->config->connectionAudit.removeDispatcher(session.get()); |
| this->config->sessionsLock.writeLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->sessionsLock.writeLock().unlock(); |
| throw; |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::addProducer(Pointer<ActiveMQProducerKernel> producer) { |
| |
| try { |
| synchronized(&this->config->activeProducers) { |
| this->config->activeProducers.put(producer->getProducerInfo()->getProducerId(), producer); |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::removeProducer(const decaf::lang::Pointer<ProducerId>& producerId) { |
| |
| try { |
| synchronized(&this->config->activeProducers) { |
| this->config->activeProducers.remove(producerId); |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| std::string ActiveMQConnection::getClientID() const { |
| |
| if (this->isClosed()) { |
| return ""; |
| } |
| |
| return this->config->connectionInfo->getClientId(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setClientID(const std::string& clientID) { |
| |
| if (this->closed.get()) { |
| throw cms::IllegalStateException("Connection is already closed", NULL); |
| } |
| |
| if (this->config->clientIDSet) { |
| throw cms::IllegalStateException("Client ID is already set", NULL); |
| } |
| |
| if (this->config->isConnectionInfoSentToBroker) { |
| throw cms::IllegalStateException("Cannot set client Id on a Connection already in use.", NULL); |
| } |
| |
| if (clientID.empty()) { |
| throw cms::InvalidClientIdException("Client ID cannot be an empty string", NULL); |
| } |
| |
| this->config->connectionInfo->setClientId(clientID); |
| this->config->userSpecifiedClientID = true; |
| |
| try { |
| ensureConnectionInfoSent(); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setDefaultClientId(const std::string& clientId) { |
| this->setClientID(clientId); |
| this->config->userSpecifiedClientID = true; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::close() { |
| |
| try { |
| |
| if (this->isClosed()) { |
| return; |
| } |
| |
| Exception ex; |
| bool hasException = false; |
| |
| // If we are running lets stop first. |
| if (!this->transportFailed.get()) { |
| try { |
| this->stop(); |
| } catch (cms::CMSException& error) { |
| if (!hasException) { |
| ex = ActiveMQException(error.clone()); |
| hasException = true; |
| } |
| } |
| } |
| |
| // Indicates we are on the way out to suppress any exceptions getting |
| // passed on from the transport as it goes down. |
| this->closing.set(true); |
| |
| if (this->config->scheduler != NULL) { |
| try { |
| this->config->scheduler->stop(); |
| } catch (Exception& error) { |
| if (!hasException) { |
| ex = error; |
| ex.setMark(__FILE__, __LINE__); |
| hasException = true; |
| } |
| } |
| } |
| |
| long long lastDeliveredSequenceId = -1; |
| |
| // Get the complete list of active sessions. |
| try { |
| this->config->sessionsLock.writeLock().lock(); |
| |
| // We need to use a copy since we aren't able to use CopyOnWriteArrayList |
| ArrayList<Pointer<ActiveMQSessionKernel> > sessions(this->config->activeSessions); |
| std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator()); |
| |
| // Dispose of all the Session resources we know are still open. |
| while (iter->hasNext()) { |
| Pointer<ActiveMQSessionKernel> session = iter->next(); |
| try { |
| session->dispose(); |
| lastDeliveredSequenceId = Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId()); |
| } catch (cms::CMSException& ex) { |
| } |
| } |
| |
| this->config->activeSessions.clear(); |
| this->config->sessionsLock.writeLock().unlock(); |
| } catch (Exception& error) { |
| this->config->sessionsLock.writeLock().unlock(); |
| if (!hasException) { |
| ex = error; |
| ex.setMark(__FILE__, __LINE__); |
| hasException = true; |
| } |
| } |
| |
| // As TemporaryQueue and TemporaryTopic instances are bound to a connection |
| // we should just delete them after the connection is closed to free up memory |
| if (this->config->advisoryConsumer != NULL) { |
| this->config->advisoryConsumer->dispose(); |
| } |
| |
| ArrayList<Pointer<ActiveMQTempDestination> > tempDests(this->config->activeTempDestinations.values()); |
| Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(tempDests.iterator()); |
| |
| try { |
| while (iterator->hasNext()) { |
| Pointer<ActiveMQTempDestination> dest = iterator->next(); |
| dest->close(); |
| } |
| } catch (cms::CMSException& error) { |
| if (!hasException) { |
| ex = ActiveMQException(error.clone()); |
| hasException = true; |
| } |
| } |
| |
| try { |
| if (this->config->executor != NULL) { |
| this->config->executor->shutdown(); |
| } |
| } catch (Exception& error) { |
| if (!hasException) { |
| ex = error; |
| ex.setMark(__FILE__, __LINE__); |
| hasException = true; |
| } |
| } |
| |
| // Now inform the Broker we are shutting down. |
| try { |
| this->disconnect(lastDeliveredSequenceId); |
| } catch (Exception& error) { |
| if (!hasException) { |
| ex = error; |
| ex.setMark(__FILE__, __LINE__); |
| hasException = true; |
| } |
| } |
| |
| // Once current deliveries are done this stops the delivery |
| // of any new messages. |
| this->started.set(false); |
| this->closed.set(true); |
| |
| if (hasException) { |
| throw ex; |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::cleanup() { |
| |
| try { |
| |
| this->config->sessionsLock.writeLock().lock(); |
| try { |
| // We need to use a copy since we aren't able to use CopyOnWriteArrayList |
| ArrayList<Pointer<ActiveMQSessionKernel> > sessions(this->config->activeSessions); |
| std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator()); |
| |
| // Dispose of all the Session resources we know are still open. |
| while (iter->hasNext()) { |
| Pointer<ActiveMQSessionKernel> session = iter->next(); |
| try { |
| session->dispose(); |
| } catch (cms::CMSException& ex) { |
| /* Absorb */ |
| } |
| } |
| this->config->activeSessions.clear(); |
| this->config->sessionsLock.writeLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->sessionsLock.writeLock().unlock(); |
| throw; |
| } |
| |
| if (this->config->isConnectionInfoSentToBroker) { |
| if (!transportFailed.get() && !closing.get()) { |
| this->syncRequest(this->config->connectionInfo->createRemoveCommand()); |
| } |
| this->config->isConnectionInfoSentToBroker = false; |
| } |
| |
| if (this->config->userSpecifiedClientID) { |
| this->config->connectionInfo->setClientId(""); |
| this->config->userSpecifiedClientID = false; |
| } |
| |
| this->config->clientIDSet = false; |
| this->started.set(false); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::start() { |
| |
| try { |
| |
| checkClosedOrFailed(); |
| ensureConnectionInfoSent(); |
| |
| try { |
| // This starts or restarts the delivery of all incoming messages |
| // messages delivered while this connection is stopped are dropped |
| // and not acknowledged. |
| if (this->started.compareAndSet(false, true)) { |
| this->config->sessionsLock.readLock().lock(); |
| |
| // Start all the sessions. |
| std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator()); |
| while (iter->hasNext()) { |
| iter->next()->start(); |
| } |
| |
| this->config->sessionsLock.readLock().unlock(); |
| } |
| } catch (Exception& ex) { |
| this->config->sessionsLock.readLock().unlock(); |
| throw; |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::stop() { |
| |
| try { |
| |
| checkClosedOrFailed(); |
| |
| try { |
| // Once current deliveries are done this stops the delivery of any |
| // new messages. |
| if (this->started.compareAndSet(true, false)) { |
| this->config->sessionsLock.readLock().lock(); |
| std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator()); |
| |
| while (iter->hasNext()) { |
| iter->next()->stop(); |
| } |
| this->config->sessionsLock.readLock().unlock(); |
| } |
| } catch (Exception& ex) { |
| this->config->sessionsLock.readLock().unlock(); |
| throw; |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::disconnect(long long lastDeliveredSequenceId) { |
| |
| try { |
| |
| // Clear the listener, we don't care about async errors at this point. |
| this->config->transport->setTransportListener(&ConnectionConfig::DO_NOTHING_TRANSPORT_LISTENER); |
| |
| // Allow the Support class to shutdown its resources, including the Transport. |
| bool hasException = false; |
| exceptions::ActiveMQException e; |
| |
| if (this->config->isConnectionInfoSentToBroker) { |
| |
| try { |
| // Remove our ConnectionId from the Broker |
| Pointer<RemoveInfo> command(this->config->connectionInfo->createRemoveCommand()); |
| command->setLastDeliveredSequenceId(lastDeliveredSequenceId); |
| this->syncRequest(command, this->config->closeTimeout); |
| } catch (exceptions::ActiveMQException& ex) { |
| if (!hasException) { |
| hasException = true; |
| ex.setMark(__FILE__, __LINE__); |
| e = ex; |
| } |
| } |
| |
| try { |
| // Send the disconnect command to the broker. |
| Pointer<ShutdownInfo> shutdown(new ShutdownInfo()); |
| oneway(shutdown); |
| } catch (exceptions::ActiveMQException& ex) { |
| if (!hasException) { |
| hasException = true; |
| ex.setMark(__FILE__, __LINE__); |
| e = ex; |
| } |
| } |
| } |
| |
| if (this->config->transport != NULL) { |
| |
| try { |
| this->config->transport->close(); |
| } catch (exceptions::ActiveMQException& ex) { |
| if (!hasException) { |
| hasException = true; |
| ex.setMark(__FILE__, __LINE__); |
| e = ex; |
| } |
| } |
| } |
| |
| // If we encountered an exception - throw the first one we encountered. |
| // This will preserve the stack trace for logging purposes. |
| if (hasException) { |
| throw e; |
| } |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::sendPullRequest(const ConsumerInfo* consumer, long long timeout) { |
| |
| try { |
| |
| if (consumer->getPrefetchSize() == 0) { |
| |
| Pointer<MessagePull> messagePull(new MessagePull()); |
| messagePull->setConsumerId(consumer->getConsumerId()); |
| messagePull->setDestination(consumer->getDestination()); |
| messagePull->setTimeout(timeout); |
| |
| this->oneway(messagePull); |
| } |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::destroyDestination(const ActiveMQDestination* destination) { |
| |
| try { |
| |
| if (destination == NULL) { |
| throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL"); |
| } |
| |
| checkClosedOrFailed(); |
| ensureConnectionInfoSent(); |
| |
| Pointer<DestinationInfo> command(new DestinationInfo()); |
| |
| command->setConnectionId(this->config->connectionInfo->getConnectionId()); |
| command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION); |
| command->setDestination(Pointer<ActiveMQDestination>(destination->cloneDataStructure())); |
| |
| // Send the message to the broker. |
| syncRequest(command); |
| } |
| AMQ_CATCH_RETHROW(NullPointerException) |
| AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException) |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::destroyDestination(const cms::Destination* destination) { |
| |
| try { |
| |
| if (destination == NULL) { |
| throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL"); |
| } |
| |
| checkClosedOrFailed(); |
| ensureConnectionInfoSent(); |
| |
| const ActiveMQDestination* amqDestination = dynamic_cast<const ActiveMQDestination*>(destination); |
| |
| this->destroyDestination(amqDestination); |
| } |
| AMQ_CATCH_RETHROW(NullPointerException) |
| AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException) |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onCommand(const Pointer<Command> command) { |
| |
| try { |
| |
| if (command->isMessageDispatch()) { |
| |
| Pointer<MessageDispatch> dispatch = command.dynamicCast<MessageDispatch>(); |
| |
| // Check first to see if we are recovering. |
| waitForTransportInterruptionProcessingToComplete(); |
| |
| // Look up the dispatcher. |
| Dispatcher* dispatcher = NULL; |
| synchronized(&this->config->dispatchers) { |
| |
| dispatcher = this->config->dispatchers.get(dispatch->getConsumerId()); |
| |
| // If we have no registered dispatcher, the consumer was probably |
| // just closed. |
| if (dispatcher != NULL) { |
| |
| Pointer<commands::Message> message = dispatch->getMessage(); |
| |
| // Message == NULL to signal the end of a Queue Browse. |
| if (message != NULL) { |
| message->setReadOnlyBody(true); |
| message->setReadOnlyProperties(true); |
| message->setRedeliveryCounter(dispatch->getRedeliveryCounter()); |
| message->setConnection(this); |
| } |
| |
| dispatcher->dispatch(dispatch); |
| } |
| } |
| |
| } else if (command->isProducerAck()) { |
| |
| ProducerAck* producerAck = dynamic_cast<ProducerAck*>(command.get()); |
| |
| // Get the consumer info object for this consumer. |
| Pointer<ActiveMQProducerKernel> producer; |
| synchronized(&this->config->activeProducers) { |
| producer = this->config->activeProducers.get(producerAck->getProducerId()); |
| if (producer != NULL) { |
| producer->onProducerAck(*producerAck); |
| } |
| } |
| |
| } else if (command->isWireFormatInfo()) { |
| this->onWireFormatInfo(command); |
| } else if (command->isBrokerInfo()) { |
| this->config->brokerInfo = command.dynamicCast<BrokerInfo>(); |
| this->config->brokerInfoReceived->countDown(); |
| } else if (command->isConnectionControl()) { |
| this->onConnectionControl(command); |
| } else if (command->isControlCommand()) { |
| this->onControlCommand(command); |
| } else if (command->isConnectionError()) { |
| |
| Pointer<ConnectionError> connectionError = command.dynamicCast<ConnectionError>(); |
| this->config->executor->execute(new ConnectionErrorRunnable(this, connectionError)); |
| |
| } else if (command->isConsumerControl()) { |
| this->onConsumerControl(command); |
| } |
| |
| synchronized(&this->config->transportListeners) { |
| Pointer<Iterator<TransportListener*> > iter(this->config->transportListeners.iterator()); |
| while (iter->hasNext()) { |
| try { |
| iter->next()->onCommand(command); |
| } catch (...) { |
| } |
| } |
| } |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onWireFormatInfo(Pointer<commands::Command> command AMQCPP_UNUSED) { |
| this->config->brokerWireFormatInfo = command.dynamicCast<WireFormatInfo>(); |
| this->config->protocolVersion->set(this->config->brokerWireFormatInfo->getVersion()); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onControlCommand(Pointer<commands::Command> command AMQCPP_UNUSED) { |
| // Don't need to do anything yet as close and shutdown are applicable yet. |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onConnectionControl(Pointer<commands::Command> command AMQCPP_UNUSED) { |
| // Don't need to do anything yet as we don't support optimizeAcknowledge. |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onConsumerControl(Pointer<commands::Command> command) { |
| |
| Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>(); |
| |
| this->config->sessionsLock.readLock().lock(); |
| try { |
| // Get the complete list of active sessions. |
| std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator()); |
| |
| while (iter->hasNext()) { |
| Pointer<ActiveMQSessionKernel> session = iter->next(); |
| if (consumerControl->isClose()) { |
| session->close(consumerControl->getConsumerId()); |
| } else { |
| session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch()); |
| } |
| } |
| this->config->sessionsLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->sessionsLock.readLock().unlock(); |
| throw; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onException(const decaf::lang::Exception& ex) { |
| |
| try { |
| |
| // Sync with the config destructor in case a client attempt to |
| synchronized(&this->config->onExceptionLock) { |
| onAsyncException(ex); |
| |
| // We're disconnected - the asynchronous error is expected. |
| if (!this->isClosed() || !this->closing.get()) { |
| this->config->executor->execute(new OnExceptionRunnable(this, config, ex.clone())); |
| } |
| } |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onAsyncException(const decaf::lang::Exception& ex) { |
| |
| if (!this->isClosed() || !this->closing.get()) { |
| |
| if (this->config->exceptionListener != NULL) { |
| this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex)); |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onClientInternalException(const decaf::lang::Exception& ex) { |
| |
| if (!closed.get() && !closing.get()) { |
| |
| if (this->config->exceptionListener != NULL) { |
| this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex)); |
| } |
| |
| // TODO Turn this into an invocation on a special ClientInternalExceptionListener |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::transportInterrupted() { |
| |
| this->config->transportInterruptionProcessingComplete->set(0); |
| |
| this->config->sessionsLock.readLock().lock(); |
| try { |
| std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator()); |
| while (sessions->hasNext()) { |
| sessions->next()->clearMessagesInProgress(this->config->transportInterruptionProcessingComplete); |
| } |
| this->config->sessionsLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->sessionsLock.readLock().unlock(); |
| throw; |
| } |
| |
| synchronized(&this->config->transportListeners) { |
| Pointer<Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator()); |
| while (listeners->hasNext()) { |
| try { |
| listeners->next()->transportInterrupted(); |
| } catch (...) { |
| } |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::transportResumed() { |
| |
| synchronized(&this->config->transportListeners) { |
| Pointer<Iterator<TransportListener*> > iter(this->config->transportListeners.iterator()); |
| while (iter->hasNext()) { |
| try { |
| iter->next()->transportResumed(); |
| } catch (...) { |
| } |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::oneway(Pointer<Command> command) { |
| |
| try { |
| checkClosedOrFailed(); |
| this->config->transport->oneway(command); |
| } |
| AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<Response> ActiveMQConnection::syncRequest(Pointer<Command> command, unsigned int timeout) { |
| |
| try { |
| |
| checkClosedOrFailed(); |
| |
| Pointer<Response> response; |
| |
| if (timeout == 0) { |
| response = this->config->transport->request(command); |
| } else { |
| response = this->config->transport->request(command, timeout); |
| } |
| |
| commands::ExceptionResponse* exceptionResponse = dynamic_cast<ExceptionResponse*>(response.get()); |
| |
| if (exceptionResponse != NULL) { |
| throw exceptionResponse->getException()->createExceptionObject(); |
| } |
| |
| return response; |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::asyncRequest(Pointer<Command> command, cms::AsyncCallback* onComplete) { |
| |
| try { |
| |
| if (onComplete == NULL) { |
| this->syncRequest(command); |
| return; |
| } |
| |
| checkClosedOrFailed(); |
| |
| Pointer<ResponseCallback> callback(new AsyncResponseCallback(this->config, onComplete)); |
| this->config->transport->asyncRequest(command, callback); |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::checkClosed() const { |
| if (this->isClosed()) { |
| throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConnection::enforceConnected - Connection has already been closed!"); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::checkClosedOrFailed() const { |
| |
| checkClosed(); |
| if (this->transportFailed.get() == true) { |
| throw ConnectionFailedException(*this->config->firstFailureError); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::ensureConnectionInfoSent() { |
| |
| try { |
| |
| // Can we skip sending the ConnectionInfo packet, cheap test |
| if (this->config->isConnectionInfoSentToBroker || closed.get()) { |
| return; |
| } |
| |
| synchronized(&( this->config->ensureConnectionInfoSentMutex)) { |
| |
| // Can we skip sending the ConnectionInfo packet?? |
| if (this->config->isConnectionInfoSentToBroker || closed.get()) { |
| return; |
| } |
| |
| // check for a user specified Id |
| if (!this->config->userSpecifiedClientID) { |
| this->config->connectionInfo->setClientId(this->config->clientIdGenerator->generateId()); |
| } |
| |
| // Now we ping the broker and see if we get an ack / nack |
| syncRequest(this->config->connectionInfo); |
| |
| this->config->isConnectionInfoSentToBroker = true; |
| |
| Pointer<SessionId> sessionId(new SessionId(this->config->connectionInfo->getConnectionId().get(), -1)); |
| Pointer<ConsumerId> consumerId(new ConsumerId(*sessionId, this->config->consumerIdGenerator.getNextSequenceId())); |
| if (this->config->watchTopicAdvisories) { |
| this->config->advisoryConsumer.reset(new AdvisoryConsumer(this, consumerId)); |
| } |
| } |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::fire(const ActiveMQException& ex) { |
| if (this->config->exceptionListener != NULL) { |
| try { |
| this->config->exceptionListener->onException(ex.convertToCMSException()); |
| } catch (...) { |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const ConnectionInfo& ActiveMQConnection::getConnectionInfo() const { |
| checkClosed(); |
| return *this->config->connectionInfo; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const ConnectionId& ActiveMQConnection::getConnectionId() const { |
| checkClosed(); |
| return *(this->config->connectionInfo->getConnectionId()); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::addTransportListener(TransportListener* transportListener) { |
| |
| if (transportListener == NULL) { |
| return; |
| } |
| |
| // Add this listener from the set of active TransportListeners |
| synchronized(&this->config->transportListeners) { |
| this->config->transportListeners.add(transportListener); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::removeTransportListener(TransportListener* transportListener) { |
| |
| if (transportListener == NULL) { |
| return; |
| } |
| |
| // Remove this listener from the set of active TransportListeners |
| synchronized(&this->config->transportListeners) { |
| this->config->transportListeners.remove(transportListener); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::waitForTransportInterruptionProcessingToComplete() { |
| |
| while (!closed.get() && !transportFailed.get() && this->config->transportInterruptionProcessingComplete->get() > 0) { |
| signalInterruptionProcessingComplete(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setTransportInterruptionProcessingComplete() { |
| if (this->config->transportInterruptionProcessingComplete->decrementAndGet() == 0) { |
| signalInterruptionProcessingComplete(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::signalInterruptionProcessingComplete() { |
| |
| FailoverTransport* failoverTransport = |
| dynamic_cast<FailoverTransport*>(this->config->transport->narrow(typeid(FailoverTransport))); |
| |
| if (failoverTransport != NULL) { |
| failoverTransport->setConnectionInterruptProcessingComplete( |
| this->config->connectionInfo->getConnectionId()); |
| } |
| |
| this->config->transportInterruptionProcessingComplete->set(0); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setUsername(const std::string& username) { |
| this->config->connectionInfo->setUserName(username); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const std::string& ActiveMQConnection::getUsername() const { |
| return this->config->connectionInfo->getUserName(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setPassword(const std::string& password) { |
| this->config->connectionInfo->setPassword(password); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const std::string& ActiveMQConnection::getPassword() const { |
| return this->config->connectionInfo->getPassword(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setBrokerURL(const std::string& brokerURL) { |
| this->config->brokerURL = brokerURL; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const std::string& ActiveMQConnection::getBrokerURL() const { |
| return this->config->brokerURL; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setExceptionListener(cms::ExceptionListener* listener) { |
| this->config->exceptionListener = listener; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::ExceptionListener* ActiveMQConnection::getExceptionListener() const { |
| return this->config->exceptionListener; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setMessageTransformer(cms::MessageTransformer* transformer) { |
| this->config->transformer = transformer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MessageTransformer* ActiveMQConnection::getMessageTransformer() const { |
| return this->config->transformer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::DestinationSource* ActiveMQConnection::getDestinationSource() { |
| return new ActiveMQDestinationSource(this); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setPrefetchPolicy(PrefetchPolicy* policy) { |
| this->config->defaultPrefetchPolicy.reset(policy); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| PrefetchPolicy* ActiveMQConnection::getPrefetchPolicy() const { |
| return this->config->defaultPrefetchPolicy.get(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setRedeliveryPolicy(RedeliveryPolicy* policy) { |
| this->config->defaultRedeliveryPolicy.reset(policy); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| RedeliveryPolicy* ActiveMQConnection::getRedeliveryPolicy() const { |
| return this->config->defaultRedeliveryPolicy.get(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isDispatchAsync() const { |
| return this->config->dispatchAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setDispatchAsync(bool value) { |
| this->config->dispatchAsync = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isAlwaysSyncSend() const { |
| return this->config->alwaysSyncSend; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setAlwaysSyncSend(bool value) { |
| this->config->alwaysSyncSend = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isUseAsyncSend() const { |
| return this->config->useAsyncSend; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setUseAsyncSend(bool value) { |
| this->config->useAsyncSend = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isUseCompression() const { |
| return this->config->useCompression; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setUseCompression(bool value) { |
| this->config->useCompression = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int ActiveMQConnection::getCompressionLevel() const { |
| return this->config->compressionLevel; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setCompressionLevel(int value) { |
| |
| if (value < 0) { |
| this->config->compressionLevel = -1; |
| } |
| |
| this->config->compressionLevel = Math::min(value, 9); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| unsigned int ActiveMQConnection::getSendTimeout() const { |
| return this->config->sendTimeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setSendTimeout(unsigned int timeout) { |
| this->config->sendTimeout = timeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| unsigned int ActiveMQConnection::getCloseTimeout() const { |
| return this->config->closeTimeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setCloseTimeout(unsigned int timeout) { |
| this->config->closeTimeout = timeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| unsigned int ActiveMQConnection::getProducerWindowSize() const { |
| return this->config->producerWindowSize; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setProducerWindowSize(unsigned int windowSize) { |
| this->config->producerWindowSize = windowSize; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long ActiveMQConnection::getNextTempDestinationId() { |
| return this->config->tempDestinationIds.getNextSequenceId(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long ActiveMQConnection::getNextLocalTransactionId() { |
| return this->config->localTransactionIds.getNextSequenceId(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| transport::Transport& ActiveMQConnection::getTransport() const { |
| return *(this->config->transport); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<Scheduler> ActiveMQConnection::getScheduler() const { |
| return this->config->scheduler; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isMessagePrioritySupported() const { |
| return this->config->messagePrioritySupported; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setMessagePrioritySupported(bool value) { |
| this->config->messagePrioritySupported = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setFirstFailureError(decaf::lang::Exception* error) { |
| |
| this->transportFailed.set(true); |
| |
| if (this->config->firstFailureError == NULL) { |
| this->config->firstFailureError.reset(error); |
| } else { |
| delete error; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| decaf::lang::Exception* ActiveMQConnection::getFirstFailureError() const { |
| return this->config->firstFailureError.get(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| std::string ActiveMQConnection::getResourceManagerId() const { |
| try { |
| this->config->waitForBrokerInfo(); |
| |
| if (this->config->brokerInfo == NULL) { |
| throw CMSException("Connection failed before Broker info was received."); |
| } |
| |
| return this->config->brokerInfo->getBrokerId()->getValue(); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const decaf::util::Properties& ActiveMQConnection::getProperties() const { |
| return *(this->config->properties); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ExecutorService* ActiveMQConnection::getExecutor() const { |
| return this->config->executor.get(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ArrayList< Pointer<ActiveMQSessionKernel> > ActiveMQConnection::getSessions() const { |
| ArrayList< Pointer<ActiveMQSessionKernel> > result; |
| |
| this->config->sessionsLock.readLock().lock(); |
| try { |
| result.addAll(this->config->activeSessions); |
| this->config->sessionsLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->sessionsLock.readLock().unlock(); |
| throw; |
| } |
| |
| return result; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isWatchTopicAdvisories() const { |
| return this->config->watchTopicAdvisories; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setWatchTopicAdvisories(bool value) { |
| this->config->watchTopicAdvisories = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int ActiveMQConnection::getAuditDepth() const { |
| return this->config->auditDepth; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setAuditDepth(int auditDepth) { |
| this->config->auditDepth = auditDepth; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int ActiveMQConnection::getAuditMaximumProducerNumber() const { |
| return this->config->auditMaximumProducerNumber; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { |
| this->config->auditMaximumProducerNumber = auditMaximumProducerNumber; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isCheckForDuplicates() const { |
| return this->config->checkForDuplicates; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setCheckForDuplicates(bool checkForDuplicates) { |
| this->config->checkForDuplicates = checkForDuplicates; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isSendAcksAsync() const { |
| return this->config->sendAcksAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setSendAcksAsync(bool sendAcksAsync) { |
| this->config->sendAcksAsync = sendAcksAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isTransactedIndividualAck() const { |
| return this->config->transactedIndividualAck; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setTransactedIndividualAck(bool transactedIndividualAck) { |
| this->config->transactedIndividualAck = transactedIndividualAck; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isNonBlockingRedelivery() const { |
| return this->config->nonBlockingRedelivery; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setNonBlockingRedelivery(bool nonBlockingRedelivery) { |
| this->config->nonBlockingRedelivery = nonBlockingRedelivery; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isOptimizeAcknowledge() const { |
| return this->config->optimizeAcknowledge; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setOptimizeAcknowledge(bool optimizeAcknowledge) { |
| this->config->optimizeAcknowledge = optimizeAcknowledge; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long ActiveMQConnection::getOptimizeAcknowledgeTimeOut() const { |
| return this->config->optimizeAcknowledgeTimeOut; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut) { |
| this->config->optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long ActiveMQConnection::getOptimizedAckScheduledAckInterval() const { |
| return this->config->optimizedAckScheduledAckInterval; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval) { |
| this->config->optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long ActiveMQConnection::getConsumerFailoverRedeliveryWaitPeriod() const { |
| return this->config->consumerFailoverRedeliveryWaitPeriod; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setConsumerFailoverRedeliveryWaitPeriod(long long value) { |
| this->config->consumerFailoverRedeliveryWaitPeriod = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isUseRetroactiveConsumer() const { |
| return this->config->useRetroactiveConsumer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setUseRetroactiveConsumer(bool useRetroactiveConsumer) { |
| this->config->useRetroactiveConsumer = useRetroactiveConsumer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isExclusiveConsumer() const { |
| return this->config->exclusiveConsumer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setExclusiveConsumer(bool exclusiveConsumer) { |
| this->config->exclusiveConsumer = exclusiveConsumer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::addTempDestination(Pointer<ActiveMQTempDestination> destination) { |
| this->config->activeTempDestinations.put(destination, destination); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::removeTempDestination(Pointer<ActiveMQTempDestination> destination) { |
| this->config->activeTempDestinations.remove(destination); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::deleteTempDestination(Pointer<ActiveMQTempDestination> destination) { |
| |
| try { |
| |
| if (destination == NULL) { |
| throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL"); |
| } |
| |
| checkClosedOrFailed(); |
| ensureConnectionInfoSent(); |
| |
| this->config->sessionsLock.readLock().lock(); |
| try { |
| Pointer<Iterator<Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator()); |
| while (iterator->hasNext()) { |
| Pointer<ActiveMQSessionKernel> session = iterator->next(); |
| if (session->isInUse(destination)) { |
| this->config->sessionsLock.readLock().unlock(); |
| throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination"); |
| } |
| } |
| this->config->sessionsLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->sessionsLock.readLock().unlock(); |
| throw; |
| } |
| |
| this->config->activeTempDestinations.remove(destination); |
| |
| Pointer<DestinationInfo> command(new DestinationInfo()); |
| |
| command->setConnectionId(this->config->connectionInfo->getConnectionId()); |
| command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION); |
| command->setDestination(Pointer<ActiveMQDestination>(destination->cloneDataStructure())); |
| |
| // Send the message to the broker. |
| syncRequest(command); |
| } |
| AMQ_CATCH_RETHROW(NullPointerException) |
| AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException) |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::cleanUpTempDestinations() { |
| |
| if (this->config->activeTempDestinations.isEmpty()) { |
| return; |
| } |
| |
| ArrayList< Pointer<ActiveMQTempDestination> > tempDests(this->config->activeTempDestinations.values()); |
| Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(tempDests.iterator()); |
| while (iterator->hasNext()) { |
| Pointer<ActiveMQTempDestination> dest = iterator->next(); |
| |
| try { |
| // Only delete this temporary destination if it was created from this connection, since the |
| // advisory consumer tracks all temporary destinations there can be others in our mapping that |
| // this connection did not create. |
| std::string thisConnectionId = |
| this->config->connectionInfo->getConnectionId() != NULL ? this->config->connectionInfo->getConnectionId()->toString() : ""; |
| if (dest->getConnectionId() == thisConnectionId) { |
| this->deleteTempDestination(dest); |
| } |
| } catch (Exception& ex) { |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isDeleted(Pointer<ActiveMQTempDestination> destination) const { |
| |
| if (this->config->advisoryConsumer == NULL) { |
| return false; |
| } |
| |
| return !this->config->activeTempDestinations.containsKey(destination); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) { |
| |
| if (this->config->checkForDuplicates) { |
| return this->config->connectionAudit.isDuplicate(dispatcher, message); |
| } |
| |
| return false; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::removeAuditedDispatcher(Dispatcher* dispatcher) { |
| this->config->connectionAudit.removeDispatcher(dispatcher); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) { |
| this->config->connectionAudit.rollbackDuplicate(dispatcher, message); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isAlwaysSessionAsync() const { |
| return this->config->alwaysSessionAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setAlwaysSessionAsync(bool alwaysSessionAsync) { |
| this->config->alwaysSessionAsync = alwaysSessionAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int ActiveMQConnection::getProtocolVersion() const { |
| return this->config->protocolVersion->get(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnection::isConsumerExpiryCheckEnabled() { |
| return this->config->consumerExpiryCheckEnabled; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) { |
| this->config->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; |
| } |