| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "ActiveMQSessionKernel.h" |
| |
| #include <activemq/exceptions/ActiveMQException.h> |
| #include <activemq/core/ActiveMQConstants.h> |
| #include <activemq/core/ActiveMQConnection.h> |
| #include <activemq/core/ActiveMQTransactionContext.h> |
| #include <activemq/core/ActiveMQConsumer.h> |
| #include <activemq/core/ActiveMQProducer.h> |
| #include <activemq/core/ActiveMQQueueBrowser.h> |
| #include <activemq/core/ActiveMQSessionExecutor.h> |
| #include <activemq/core/PrefetchPolicy.h> |
| #include <activemq/util/ActiveMQProperties.h> |
| #include <activemq/util/ActiveMQMessageTransformation.h> |
| #include <activemq/util/CMSExceptionSupport.h> |
| |
| #include <activemq/commands/ConsumerInfo.h> |
| #include <activemq/commands/DestinationInfo.h> |
| #include <activemq/commands/ExceptionResponse.h> |
| #include <activemq/commands/ActiveMQDestination.h> |
| #include <activemq/commands/ActiveMQTopic.h> |
| #include <activemq/commands/ActiveMQQueue.h> |
| #include <activemq/commands/ActiveMQTempDestination.h> |
| #include <activemq/commands/ActiveMQMessage.h> |
| #include <activemq/commands/ActiveMQBytesMessage.h> |
| #include <activemq/commands/ActiveMQTextMessage.h> |
| #include <activemq/commands/ActiveMQMapMessage.h> |
| #include <activemq/commands/ActiveMQStreamMessage.h> |
| #include <activemq/commands/ActiveMQTempTopic.h> |
| #include <activemq/commands/ActiveMQTempQueue.h> |
| #include <activemq/commands/RemoveInfo.h> |
| #include <activemq/commands/ProducerInfo.h> |
| #include <activemq/commands/RemoveSubscriptionInfo.h> |
| |
| #include <decaf/lang/Boolean.h> |
| #include <decaf/lang/Integer.h> |
| #include <decaf/lang/Runnable.h> |
| #include <decaf/lang/Long.h> |
| #include <decaf/lang/Math.h> |
| #include <decaf/util/Queue.h> |
| #include <decaf/util/LinkedList.h> |
| #include <decaf/util/concurrent/Mutex.h> |
| #include <decaf/util/concurrent/atomic/AtomicBoolean.h> |
| #include <decaf/util/concurrent/locks/ReentrantReadWriteLock.h> |
| #include <decaf/lang/exceptions/InvalidStateException.h> |
| #include <decaf/lang/exceptions/NullPointerException.h> |
| |
| using namespace std; |
| using namespace activemq; |
| using namespace activemq::util; |
| using namespace activemq::core; |
| using namespace activemq::core::kernels; |
| using namespace activemq::commands; |
| using namespace activemq::exceptions; |
| using namespace activemq::threads; |
| using namespace decaf::util; |
| using namespace decaf::util::concurrent; |
| using namespace decaf::util::concurrent::atomic; |
| using namespace decaf::lang; |
| using namespace decaf::lang::exceptions; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| namespace activemq{ |
| namespace core{ |
| namespace kernels{ |
| |
| class CloseSynhcronization; |
| |
| class SessionConfig { |
| private: |
| |
| SessionConfig(const SessionConfig&); |
| SessionConfig& operator=(const SessionConfig&); |
| |
| public: |
| |
| AtomicBoolean synchronizationRegistered; |
| decaf::util::concurrent::locks::ReentrantReadWriteLock producerLock; |
| decaf::util::LinkedList< Pointer<ActiveMQProducerKernel> > producers; |
| decaf::util::concurrent::locks::ReentrantReadWriteLock consumerLock; |
| decaf::util::LinkedList< Pointer<ActiveMQConsumerKernel> > consumers; |
| Pointer<Scheduler> scheduler; |
| Pointer<CloseSynhcronization> closeSync; |
| Mutex sendMutex; |
| cms::MessageTransformer* transformer; |
| int hashCode; |
| bool sessionAsyncDispatch; |
| |
| public: |
| |
| SessionConfig() : synchronizationRegistered(false), |
| producerLock(), producers(), consumerLock(), consumers(), |
| scheduler(), closeSync(), sendMutex(), transformer(NULL), |
| hashCode(), sessionAsyncDispatch(true) {} |
| ~SessionConfig() {} |
| }; |
| |
| /** |
| * Class used to clear a Consumer's dispatch queue asynchronously from the |
| * connection class's Scheduler instance. |
| */ |
| class ClearConsumerTask : public Runnable { |
| private: |
| |
| Pointer<ActiveMQConsumerKernel> consumer; |
| |
| private: |
| |
| ClearConsumerTask(const ClearConsumerTask&); |
| ClearConsumerTask& operator=(const ClearConsumerTask&); |
| |
| public: |
| |
| ClearConsumerTask(Pointer<ActiveMQConsumerKernel> consumer) : Runnable(), consumer(consumer) { |
| |
| if (consumer == NULL) { |
| throw NullPointerException( |
| __FILE__, __LINE__, "Synchronization Created with NULL Consumer."); |
| } |
| } |
| |
| virtual ~ClearConsumerTask() {} |
| |
| virtual void run() { |
| this->consumer->clearMessagesInProgress(); |
| } |
| }; |
| |
| /** |
| * Class used to Hook a session that has been closed into the Transaction |
| * it is currently a part of. Once the Transaction has been Committed or |
| * Rolled back this Synchronization can finish the Close of the session. |
| */ |
| class CloseSynhcronization : public Synchronization { |
| private: |
| |
| ActiveMQSessionKernel* session; |
| SessionConfig* config; |
| |
| private: |
| |
| CloseSynhcronization(const CloseSynhcronization&); |
| CloseSynhcronization& operator=(const CloseSynhcronization&); |
| |
| public: |
| |
| CloseSynhcronization(ActiveMQSessionKernel* session, SessionConfig* config) : |
| Synchronization(), session(session), config(config) { |
| |
| if (session == NULL || config == NULL) { |
| throw NullPointerException( |
| __FILE__, __LINE__, "Synchronization Created with NULL Session."); |
| } |
| } |
| |
| virtual ~CloseSynhcronization() {} |
| |
| virtual void beforeEnd() { |
| } |
| |
| virtual void afterCommit() { |
| config->closeSync.release(); |
| session->doClose(); |
| config->synchronizationRegistered.set(false); |
| } |
| |
| virtual void afterRollback() { |
| config->closeSync.release(); |
| session->doClose(); |
| config->synchronizationRegistered.set(false); |
| } |
| }; |
| |
| }}} |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQSessionKernel::ActiveMQSessionKernel(ActiveMQConnection* connection, |
| const Pointer<SessionId>& id, |
| cms::Session::AcknowledgeMode ackMode, |
| const Properties& properties) : config(new SessionConfig), |
| sessionInfo(), |
| transaction(), |
| connection(connection), |
| closed(false), |
| executor(), |
| ackMode(ackMode), |
| producerIds(), |
| producerSequenceIds(), |
| consumerIds(), |
| lastDeliveredSequenceId(-2) { |
| |
| if (id == NULL || connection == NULL) { |
| throw ActiveMQException( |
| __FILE__, __LINE__, |
| "ActiveMQSessionKernel::ActiveMQSessionKernel - Constructor called with NULL data"); |
| } |
| |
| this->sessionInfo.reset(new SessionInfo()); |
| this->sessionInfo->setAckMode(ackMode); |
| this->sessionInfo->setSessionId(id); |
| |
| this->config->hashCode = id->getHashCode(); |
| |
| try { |
| this->connection->oneway(this->sessionInfo); |
| } catch (...) { |
| this->sessionInfo.reset(NULL); |
| delete this->config; |
| throw; |
| } |
| |
| this->config->sessionAsyncDispatch = connection->isAlwaysSessionAsync(); |
| |
| // Create a Transaction object |
| this->transaction.reset(new ActiveMQTransactionContext(this, properties)); |
| |
| // Create the session executor object. |
| this->executor.reset(new ActiveMQSessionExecutor(this)); |
| |
| // Use the Connection's Scheduler. |
| this->config->scheduler = this->connection->getScheduler(); |
| |
| // If the connection is already started, start the session. |
| if (this->connection->isStarted()) { |
| try { |
| this->start(); |
| } catch (...) { |
| this->transaction.reset(NULL); |
| this->executor.reset(NULL); |
| delete this->config; |
| throw; |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQSessionKernel::~ActiveMQSessionKernel() { |
| try { |
| close(); |
| } |
| AMQ_CATCHALL_NOTHROW() |
| |
| try { |
| // Free the executor here so that its threads are gone before any of the |
| // other member data of this class is destroyed as it might be accessing |
| // from its run thread. |
| this->executor.reset(NULL); |
| } |
| AMQ_CATCHALL_NOTHROW() |
| |
| try { |
| delete this->config; |
| } |
| AMQ_CATCHALL_NOTHROW() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::fire(const ActiveMQException& ex) { |
| if (connection != NULL) { |
| connection->fire(ex); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::close() { |
| |
| // If we're already closed, just return. |
| if (this->closed.get()) { |
| return; |
| } |
| |
| try { |
| |
| if (this->transaction->isInXATransaction()) { |
| if (!this->config->synchronizationRegistered.compareAndSet(false, true)) { |
| this->config->closeSync.reset(new CloseSynhcronization(this, this->config)); |
| this->transaction->addSynchronization(this->config->closeSync); |
| } |
| } else { |
| doClose(); |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::doClose() { |
| |
| try { |
| dispose(); |
| |
| // Remove this session from the Broker. |
| Pointer<RemoveInfo> info(new RemoveInfo()); |
| info->setObjectId(this->sessionInfo->getSessionId()); |
| info->setLastDeliveredSequenceId(this->lastDeliveredSequenceId); |
| this->connection->oneway(info); |
| } |
| AMQ_CATCH_RETHROW( ActiveMQException ) |
| AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) |
| AMQ_CATCHALL_THROW( ActiveMQException ) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::dispose() { |
| |
| // Prevent Dispose loop if transaction has a close synchronization registered. |
| if (!closed.compareAndSet(false, true)) { |
| return; |
| } |
| |
| class Finalizer { |
| private: |
| |
| ActiveMQSessionKernel* session; |
| ActiveMQConnection* connection; |
| |
| private: |
| |
| Finalizer(const Finalizer&); |
| Finalizer& operator=(const Finalizer&); |
| |
| public: |
| |
| Finalizer(ActiveMQSessionKernel* session, ActiveMQConnection* connection) : |
| session(session), connection(connection) { |
| } |
| |
| ~Finalizer() { |
| Pointer<ActiveMQSessionKernel> session(this->session); |
| try { |
| this->connection->removeSession(session); |
| } catch(...) { |
| session.release(); |
| } |
| session.release(); |
| } |
| }; |
| |
| try { |
| |
| Finalizer final(this, this->connection); |
| |
| // Stop the dispatch executor. |
| stop(); |
| |
| // Dispose of all Consumers, the dispose method skips the RemoveInfo command. |
| this->config->consumerLock.writeLock().lock(); |
| try { |
| // We have to copy all the consumers to another list since we aren't using a |
| // CopyOnWriteArrayList right now. |
| ArrayList<Pointer<ActiveMQConsumerKernel> > consumers(this->config->consumers); |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > consumerIter(consumers.iterator()); |
| while (consumerIter->hasNext()) { |
| try{ |
| Pointer<ActiveMQConsumerKernel> consumer = consumerIter->next(); |
| consumer->setFailureError(this->connection->getFirstFailureError()); |
| consumer->dispose(); |
| this->lastDeliveredSequenceId = |
| Math::max(this->lastDeliveredSequenceId, consumer->getLastDeliveredSequenceId()); |
| } catch (cms::CMSException& ex) { |
| /* Absorb */ |
| } |
| } |
| this->config->consumers.clear(); |
| this->config->consumerLock.writeLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.writeLock().unlock(); |
| throw; |
| } |
| |
| // Dispose of all Producers, the dispose method skips the RemoveInfo command. |
| this->config->producerLock.writeLock().lock(); |
| try { |
| // We have to copy all the producers to another list since we aren't using a |
| // CopyOnWriteArrayList right now. |
| ArrayList<Pointer<ActiveMQProducerKernel> > producers(this->config->producers); |
| std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(producers.iterator()); |
| |
| while (producerIter->hasNext()) { |
| try{ |
| producerIter->next()->dispose(); |
| } catch (cms::CMSException& ex) { |
| /* Absorb */ |
| } |
| } |
| this->config->producers.clear(); |
| this->config->producerLock.writeLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->producerLock.writeLock().unlock(); |
| throw; |
| } |
| |
| // Roll Back the transaction since we were closed without an explicit call |
| // to commit it. |
| if (this->transaction->isInTransaction()) { |
| this->transaction->rollback(); |
| } |
| } |
| AMQ_CATCH_RETHROW( ActiveMQException ) |
| AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) |
| AMQ_CATCHALL_THROW( ActiveMQException ) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::commit() { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| if (!this->isTransacted()) { |
| throw ActiveMQException( |
| __FILE__, __LINE__, "ActiveMQSessionKernel::commit - This Session is not Transacted"); |
| } |
| |
| // Commit the Transaction |
| this->transaction->commit(); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::rollback() { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| if (!this->isTransacted()) { |
| throw ActiveMQException( |
| __FILE__, __LINE__, "ActiveMQSessionKernel::rollback - This Session is not Transacted"); |
| } |
| |
| // Roll back the Transaction |
| this->transaction->rollback(); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::recover() { |
| |
| try { |
| |
| checkClosed(); |
| |
| if (isTransacted()) { |
| throw cms::IllegalStateException("This session is transacted"); |
| } |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| consumer->rollback(); |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::clearMessagesInProgress(Pointer<AtomicInteger> transportsInterrupted) { |
| |
| if (this->executor.get() != NULL) { |
| this->executor->clearMessagesInProgress(); |
| } |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| consumer->inProgressClearRequired(); |
| transportsInterrupted->incrementAndGet(); |
| this->connection->getScheduler()->executeAfterDelay( |
| new ClearConsumerTask(consumer), 0LL); |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::acknowledge() { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| consumer->acknowledge(); |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::deliverAcks() { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| consumer->deliverAcks(); |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MessageConsumer* ActiveMQSessionKernel::createConsumer(const cms::Destination* destination) { |
| |
| try { |
| this->checkClosed(); |
| return this->createConsumer(destination, "", false); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MessageConsumer* ActiveMQSessionKernel::createConsumer(const cms::Destination* destination, const std::string& selector) { |
| |
| try { |
| this->checkClosed(); |
| return this->createConsumer(destination, selector, false); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MessageConsumer* ActiveMQSessionKernel::createConsumer(const cms::Destination* destination, |
| const std::string& selector, bool noLocal) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| // Cast the destination to an OpenWire destination, so we can |
| // get all the goodies. |
| const ActiveMQDestination* amqDestination = |
| dynamic_cast<const ActiveMQDestination*>( destination ); |
| |
| if (amqDestination == NULL) { |
| throw ActiveMQException(__FILE__, __LINE__, "Destination was either NULL or not created by this CMS Client"); |
| } |
| |
| Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() ); |
| |
| int prefetch = 0; |
| if (dest->isTopic()) { |
| prefetch = this->connection->getPrefetchPolicy()->getTopicPrefetch(); |
| } else { |
| prefetch = this->connection->getPrefetchPolicy()->getQueuePrefetch(); |
| } |
| |
| // Create the consumer instance. |
| Pointer<ActiveMQConsumerKernel> consumer( |
| new ActiveMQConsumerKernel(this, this->getNextConsumerId(), |
| dest, "", selector, prefetch, 0, noLocal, |
| false, this->connection->isDispatchAsync(), NULL)); |
| |
| try{ |
| this->addConsumer(consumer); |
| this->connection->syncRequest(consumer->getConsumerInfo()); |
| } catch (Exception& ex) { |
| this->removeConsumer(consumer); |
| throw; |
| } |
| |
| consumer->setMessageTransformer(this->config->transformer); |
| |
| if (this->connection->isStarted()) { |
| consumer->start(); |
| } |
| |
| return new ActiveMQConsumer(consumer); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MessageConsumer* ActiveMQSessionKernel::createDurableConsumer(const cms::Topic* destination, const std::string& name, |
| const std::string& selector, bool noLocal) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| // Cast the destination to an OpenWire destination, so we can |
| // get all the goodies. |
| const ActiveMQDestination* amqDestination = dynamic_cast<const ActiveMQDestination*> (destination); |
| |
| if (amqDestination == NULL) { |
| throw ActiveMQException(__FILE__, __LINE__, "Destination was either NULL or not created by this CMS Client"); |
| } |
| |
| Pointer<ActiveMQDestination> dest(amqDestination->cloneDataStructure()); |
| |
| // Create the consumer instance. |
| Pointer<ActiveMQConsumerKernel> consumer( |
| new ActiveMQConsumerKernel(this, this->getNextConsumerId(), |
| dest, name, selector, |
| this->connection->getPrefetchPolicy()->getDurableTopicPrefetch(), |
| 0, noLocal, false, this->connection->isDispatchAsync(), NULL)); |
| |
| try { |
| this->addConsumer(consumer); |
| this->connection->syncRequest(consumer->getConsumerInfo()); |
| } catch (Exception& ex) { |
| this->removeConsumer(consumer); |
| throw; |
| } |
| |
| consumer->setMessageTransformer(this->config->transformer); |
| |
| if (this->connection->isStarted()) { |
| consumer->start(); |
| } |
| |
| return new ActiveMQConsumer(consumer); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MessageProducer* ActiveMQSessionKernel::createProducer(const cms::Destination* destination ) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| Pointer<commands::ActiveMQDestination> dest; |
| |
| // Producers are allowed to have NULL destinations. In this case, the |
| // destination is specified by the messages as they are sent. |
| if (destination != NULL) { |
| |
| const ActiveMQDestination* amqDestination = |
| dynamic_cast<const ActiveMQDestination*> (destination); |
| |
| if (amqDestination == NULL) { |
| throw ActiveMQException( |
| __FILE__, __LINE__, |
| "Destination was either NULL or not created by this CMS Client" ); |
| } |
| |
| // Cast the destination to an OpenWire destination, so we can |
| // get all the goodies. |
| dest.reset(amqDestination->cloneDataStructure()); |
| } |
| |
| // Create the producer instance. |
| Pointer<ActiveMQProducerKernel> producer(new ActiveMQProducerKernel( |
| this, this->getNextProducerId(), dest, this->connection->getSendTimeout())); |
| |
| try { |
| this->addProducer(producer); |
| this->connection->oneway(producer->getProducerInfo()); |
| } catch (Exception& ex) { |
| this->removeProducer(producer); |
| throw; |
| } |
| |
| producer->setMessageTransformer(this->config->transformer); |
| |
| return new ActiveMQProducer(producer); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue ) { |
| |
| try { |
| return ActiveMQSessionKernel::createBrowser(queue, ""); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue, const std::string& selector) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| // Cast the destination to an OpenWire destination, so we can |
| // get all the goodies. |
| const ActiveMQDestination* amqDestination = |
| dynamic_cast<const ActiveMQDestination*>(queue); |
| |
| if (amqDestination == NULL) { |
| throw ActiveMQException(__FILE__, __LINE__, "Destination was either NULL or not created by this CMS Client"); |
| } |
| |
| Pointer<ActiveMQDestination> dest(amqDestination->cloneDataStructure()); |
| |
| // Create the QueueBrowser instance |
| std::auto_ptr<ActiveMQQueueBrowser> browser( |
| new ActiveMQQueueBrowser(this, this->getNextConsumerId(), dest, |
| selector, this->connection->isDispatchAsync())); |
| |
| return browser.release(); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Queue* ActiveMQSessionKernel::createQueue(const std::string& queueName) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| if (queueName == "") { |
| throw IllegalArgumentException( |
| __FILE__, __LINE__, "Destination Name cannot be the Empty String." ); |
| } |
| |
| if (queueName.find(commands::ActiveMQDestination::TEMP_DESTINATION_NAME_PREFIX) == 0) { |
| return new ActiveMQTempQueue(queueName); |
| } |
| |
| return new commands::ActiveMQQueue(queueName); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Topic* ActiveMQSessionKernel::createTopic(const std::string& topicName) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| if (topicName == "") { |
| throw IllegalArgumentException( |
| __FILE__, __LINE__, "Destination Name cannot be the Empty String." ); |
| } |
| |
| if (topicName.find(commands::ActiveMQDestination::TEMP_DESTINATION_NAME_PREFIX) == 0) { |
| return new ActiveMQTempTopic(topicName); |
| } |
| |
| return new commands::ActiveMQTopic(topicName); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::TemporaryQueue* ActiveMQSessionKernel::createTemporaryQueue() { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| std::auto_ptr<commands::ActiveMQTempQueue> queue(new |
| commands::ActiveMQTempQueue(this->createTemporaryDestinationName())); |
| |
| // Register it with the Broker |
| this->createTemporaryDestination(queue.get()); |
| |
| return queue.release(); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::TemporaryTopic* ActiveMQSessionKernel::createTemporaryTopic() { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| std::auto_ptr<commands::ActiveMQTempTopic> topic(new |
| commands::ActiveMQTempTopic(createTemporaryDestinationName())); |
| |
| // Register it with the Broker |
| this->createTemporaryDestination(topic.get()); |
| |
| return topic.release(); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Message* ActiveMQSessionKernel::createMessage() { |
| |
| try { |
| this->checkClosed(); |
| commands::ActiveMQMessage* message = new commands::ActiveMQMessage(); |
| message->setConnection(this->connection); |
| return message; |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::BytesMessage* ActiveMQSessionKernel::createBytesMessage() { |
| |
| try { |
| this->checkClosed(); |
| commands::ActiveMQBytesMessage* message = new commands::ActiveMQBytesMessage(); |
| message->setConnection(this->connection); |
| return message; |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::BytesMessage* ActiveMQSessionKernel::createBytesMessage(const unsigned char* bytes, int bytesSize) { |
| |
| try { |
| this->checkClosed(); |
| cms::BytesMessage* msg = createBytesMessage(); |
| msg->setBodyBytes(bytes, bytesSize); |
| return msg; |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::StreamMessage* ActiveMQSessionKernel::createStreamMessage() { |
| |
| try { |
| this->checkClosed(); |
| commands::ActiveMQStreamMessage* message = new commands::ActiveMQStreamMessage(); |
| message->setConnection(this->connection); |
| return message; |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::TextMessage* ActiveMQSessionKernel::createTextMessage() { |
| |
| try { |
| this->checkClosed(); |
| commands::ActiveMQTextMessage* message = new commands::ActiveMQTextMessage(); |
| message->setConnection(this->connection); |
| return message; |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::TextMessage* ActiveMQSessionKernel::createTextMessage(const std::string& text) { |
| |
| try { |
| this->checkClosed(); |
| cms::TextMessage* msg = createTextMessage(); |
| msg->setText(text.c_str()); |
| return msg; |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MapMessage* ActiveMQSessionKernel::createMapMessage() { |
| |
| try { |
| this->checkClosed(); |
| commands::ActiveMQMapMessage* message = new commands::ActiveMQMapMessage(); |
| message->setConnection(this->connection); |
| return message; |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Session::AcknowledgeMode ActiveMQSessionKernel::getAcknowledgeMode() const { |
| return this->ackMode; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQSessionKernel::isTransacted() const { |
| return (this->ackMode == Session::SESSION_TRANSACTED) || this->transaction->isInXATransaction(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination, |
| cms::Message* message, int deliveryMode, int priority, long long timeToLive, |
| util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| if (destination->isTemporary()) { |
| Pointer<ActiveMQTempDestination> tempDest = destination.dynamicCast<ActiveMQTempDestination>(); |
| if (this->connection->isDeleted(tempDest)) { |
| throw cms::InvalidDestinationException( |
| std::string("Cannot publish to a deleted Destination: ") + destination->toString()); |
| } |
| } |
| |
| synchronized(&this->config->sendMutex) { |
| |
| // Ensure that a new transaction is started if this is the first message |
| // sent since the last commit, Broker is notified of a new TX. |
| doStartTransaction(); |
| |
| Pointer<TransactionId> txId = this->transaction->getTransactionId(); |
| Pointer<ProducerInfo> producerInfo = producer->getProducerInfo(); |
| Pointer<ProducerId> producerId = producerInfo->getProducerId(); |
| long long sequenceId = producer->getNextMessageSequence(); |
| |
| // Set the "CMS" header fields on the original message, see JMS 1.1 spec section 3.4.11 |
| message->setCMSDeliveryMode(deliveryMode); |
| long long expiration = 0LL; |
| if (!producer->getDisableMessageTimeStamp()) { |
| long long timeStamp = System::currentTimeMillis(); |
| message->setCMSTimestamp(timeStamp); |
| if (timeToLive > 0) { |
| expiration = timeToLive + timeStamp; |
| } |
| } |
| message->setCMSExpiration(expiration); |
| message->setCMSPriority(priority); |
| message->setCMSRedelivered(false); |
| |
| // transform to our own message format here |
| commands::Message* transformed = NULL; |
| Pointer<commands::Message> amqMessage; |
| |
| // Always assign the message ID, regardless of the disable flag. |
| // Not adding a message ID will cause an NPE at the broker. |
| decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId()); |
| id->setProducerId(producerId); |
| id->setProducerSequenceId(sequenceId); |
| |
| // NOTE: |
| // Now we copy the message before sending, this allows the user to reuse the |
| // message object without interfering with the copy that's being sent. We |
| // could make this step optional to increase performance but for now we won't. |
| // To not do this implies that the user must never reuse the message object, or |
| // know that the configuration of Transports doesn't involve the message hanging |
| // around beyond the point that send returns. When the transform step results in |
| // a new Message object being created we can just use that new instance, but when |
| // the original cms::Message pointer was already a commands::Message then we need |
| // to clone it. |
| if (ActiveMQMessageTransformation::transformMessage(message, connection, &transformed)) { |
| amqMessage.reset(transformed); |
| } else { |
| amqMessage.reset(transformed->cloneDataStructure()); |
| } |
| |
| // Sets the Message ID on the original message per spec. |
| message->setCMSMessageID(id->toString()); |
| message->setCMSDestination(destination.dynamicCast<cms::Destination>().get()); |
| |
| amqMessage->setMessageId(id); |
| amqMessage->getBrokerPath().clear(); |
| amqMessage->setTransactionId(txId); |
| amqMessage->setConnection(this->connection); |
| |
| // destination format is provider specific so only set on transformed message |
| amqMessage->setDestination(destination); |
| |
| amqMessage->onSend(); |
| amqMessage->setProducerId(producerId); |
| |
| if (onComplete == NULL && sendTimeout <= 0 && !amqMessage->isResponseRequired() && !this->connection->isAlwaysSyncSend() && |
| (!amqMessage->isPersistent() || this->connection->isUseAsyncSend() || amqMessage->getTransactionId() != NULL)) { |
| |
| // No Response Required, send is asynchronous. |
| this->connection->oneway(amqMessage); |
| |
| if (producerWindow != NULL) { |
| producerWindow->enqueueUsage(amqMessage->getSize()); |
| } |
| |
| } else { |
| if (sendTimeout > 0 && onComplete == NULL) { |
| this->connection->syncRequest(amqMessage, (unsigned int)sendTimeout); |
| } else { |
| this->connection->asyncRequest(amqMessage, onComplete); |
| } |
| } |
| } |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::ExceptionListener* ActiveMQSessionKernel::getExceptionListener() { |
| |
| if (connection != NULL) { |
| return connection->getExceptionListener(); |
| } |
| |
| return NULL; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::setMessageTransformer(cms::MessageTransformer* transformer) { |
| this->config->transformer = transformer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MessageTransformer* ActiveMQSessionKernel::getMessageTransformer() const { |
| return this->config->transformer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<Scheduler> ActiveMQSessionKernel::getScheduler() const { |
| return this->config->scheduler; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::unsubscribe(const std::string& name) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| Pointer<RemoveSubscriptionInfo> rsi(new RemoveSubscriptionInfo()); |
| |
| rsi->setConnectionId(this->connection->getConnectionInfo().getConnectionId()); |
| rsi->setSubcriptionName(name); |
| rsi->setClientId(this->connection->getConnectionInfo().getClientId()); |
| |
| // Send the message to the broker. |
| this->connection->syncRequest(rsi); |
| } |
| AMQ_CATCH_ALL_THROW_CMSEXCEPTION() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::dispatch(const Pointer<MessageDispatch>& dispatch) { |
| |
| if (this->executor.get() != NULL) { |
| this->executor->execute(dispatch); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::redispatch(MessageDispatchChannel& unconsumedMessages) { |
| |
| std::vector< Pointer<MessageDispatch> > messages = unconsumedMessages.removeAll(); |
| std::vector< Pointer<MessageDispatch> >::reverse_iterator iter = messages.rbegin(); |
| |
| for (; iter != messages.rend(); ++iter) { |
| executor->executeFirst(*iter); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::start() { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| consumer->start(); |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| |
| if (this->executor.get() != NULL) { |
| this->executor->start(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::stop() { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| consumer->stop(); |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| |
| if (this->executor.get() != NULL) { |
| this->executor->stop(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQSessionKernel::isStarted() const { |
| |
| if (this->executor.get() == NULL) { |
| return false; |
| } |
| |
| return this->executor->isRunning(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination) { |
| |
| try { |
| |
| Pointer<DestinationInfo> command(new DestinationInfo()); |
| command->setConnectionId(this->connection->getConnectionInfo().getConnectionId()); |
| command->setOperationType(ActiveMQConstants::DESTINATION_ADD_OPERATION); |
| command->setDestination(Pointer<ActiveMQTempDestination> (tempDestination->cloneDataStructure())); |
| |
| // Send the message to the broker. |
| this->syncRequest(command); |
| |
| // Now that its setup, link it to this Connection so it can be closed. |
| tempDestination->setConnection(this->connection); |
| this->connection->addTempDestination(Pointer<ActiveMQTempDestination>(tempDestination->cloneDataStructure())); |
| } |
| AMQ_CATCH_RETHROW( ActiveMQException ) |
| AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) |
| AMQ_CATCHALL_THROW( ActiveMQException ) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQSessionKernel::isInUse(Pointer<ActiveMQDestination> destination) { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| if (consumer->isInUse(destination)) { |
| this->config->consumerLock.readLock().unlock(); |
| return true; |
| } |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| |
| return false; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::destroyTemporaryDestination( |
| commands::ActiveMQTempDestination* tempDestination) { |
| |
| try { |
| |
| Pointer<DestinationInfo> command(new DestinationInfo()); |
| |
| command->setConnectionId(this->connection->getConnectionInfo().getConnectionId()); |
| command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION); |
| command->setDestination(Pointer<ActiveMQTempDestination> (tempDestination->cloneDataStructure())); |
| |
| // Send the message to the broker. |
| this->connection->syncRequest(command); |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| std::string ActiveMQSessionKernel::createTemporaryDestinationName() { |
| |
| try { |
| return this->connection->getConnectionId().getValue() + ":" + |
| Long::toString(this->connection->getNextTempDestinationId()); |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::oneway(Pointer<Command> command) { |
| |
| try { |
| this->connection->oneway(command); |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<Response> ActiveMQSessionKernel::syncRequest(Pointer<Command> command, unsigned int timeout) { |
| |
| try { |
| this->checkClosed(); |
| return this->connection->syncRequest(command, timeout); |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::checkClosed() const { |
| if (this->closed.get()) { |
| throw ActiveMQException(__FILE__, __LINE__, "ActiveMQSessionKernel - Session Already Closed"); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::addConsumer(Pointer<ActiveMQConsumerKernel> consumer) { |
| |
| try { |
| |
| this->checkClosed(); |
| |
| this->config->consumerLock.writeLock().lock(); |
| try { |
| this->config->consumers.add(consumer); |
| this->config->consumerLock.writeLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.writeLock().unlock(); |
| throw; |
| } |
| |
| // Register this as a message dispatcher for the consumer. |
| this->connection->addDispatcher(consumer->getConsumerInfo()->getConsumerId(), this); |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::removeConsumer(Pointer<ActiveMQConsumerKernel> consumer) { |
| |
| try { |
| this->connection->removeDispatcher(consumer->getConsumerId()); |
| this->config->consumerLock.writeLock().lock(); |
| try { |
| this->config->consumers.remove(consumer); |
| this->connection->removeAuditedDispatcher(consumer.get()); |
| this->config->consumerLock.writeLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.writeLock().unlock(); |
| throw; |
| } |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::addProducer(Pointer<ActiveMQProducerKernel> producer) { |
| |
| try { |
| this->checkClosed(); |
| |
| this->config->producerLock.writeLock().lock(); |
| try { |
| this->config->producers.add(producer); |
| this->config->producerLock.writeLock().unlock(); |
| } catch(Exception& ex) { |
| this->config->producerLock.writeLock().unlock(); |
| throw; |
| } |
| |
| this->connection->addProducer(producer); |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::removeProducer(Pointer<ActiveMQProducerKernel> producer) { |
| |
| try { |
| this->connection->removeProducer(producer->getProducerId()); |
| this->config->producerLock.writeLock().lock(); |
| try { |
| this->config->producers.remove(producer); |
| this->config->producerLock.writeLock().unlock(); |
| } catch(Exception& ex) { |
| this->config->producerLock.writeLock().unlock(); |
| throw; |
| } |
| } |
| AMQ_CATCH_RETHROW(ActiveMQException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) |
| AMQ_CATCHALL_THROW(ActiveMQException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<ActiveMQProducerKernel> ActiveMQSessionKernel::lookupProducerKernel(Pointer<ProducerId> id) { |
| |
| this->config->producerLock.readLock().lock(); |
| try { |
| |
| std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator()); |
| |
| while (producerIter->hasNext()) { |
| Pointer<ActiveMQProducerKernel> producer = producerIter->next(); |
| if (producer->getProducerId()->equals(*id)) { |
| this->config->producerLock.readLock().unlock(); |
| return producer; |
| } |
| } |
| |
| this->config->producerLock.readLock().unlock(); |
| } catch(Exception& ex) { |
| this->config->producerLock.readLock().unlock(); |
| throw; |
| } |
| |
| return Pointer<ActiveMQProducerKernel>(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<ActiveMQConsumerKernel> ActiveMQSessionKernel::lookupConsumerKernel(Pointer<ConsumerId> id) { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| if (consumer->getConsumerId()->equals(*id)) { |
| this->config->consumerLock.readLock().unlock(); |
| return consumer; |
| } |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| |
| return Pointer<ActiveMQConsumerKernel>(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQSessionKernel::iterateConsumers() { |
| |
| if (this->closed.get()) { |
| return false; |
| } |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| if (consumer->iterate()) { |
| this->config->consumerLock.readLock().unlock(); |
| return true; |
| } |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| |
| return false; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::setPrefetchSize(Pointer<ConsumerId> id, int prefetch) { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| if (consumer->getConsumerId()->equals(*id)) { |
| consumer->setPrefetchSize(prefetch); |
| } |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::close(Pointer<ConsumerId> id) { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| if (consumer->getConsumerId()->equals(*id)) { |
| try { |
| consumer->close(); |
| } catch (cms::CMSException& e) { |
| } |
| } |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::doStartTransaction() { |
| |
| if (this->isTransacted() && !this->transaction->isInXATransaction()) { |
| this->transaction->begin(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::wakeup() { |
| |
| if (this->executor.get() != NULL) { |
| this->executor->wakeup(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<commands::ConsumerId> ActiveMQSessionKernel::getNextConsumerId() { |
| Pointer<ConsumerId> consumerId(new commands::ConsumerId()); |
| |
| consumerId->setConnectionId(this->connection->getConnectionId().getValue()); |
| consumerId->setSessionId(this->sessionInfo->getSessionId()->getValue()); |
| consumerId->setValue(this->consumerIds.getNextSequenceId()); |
| |
| return consumerId; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<commands::ProducerId> ActiveMQSessionKernel::getNextProducerId() { |
| Pointer<ProducerId> producerId(new ProducerId()); |
| |
| producerId->setConnectionId(this->connection->getConnectionId().getValue()); |
| producerId->setSessionId(this->sessionInfo->getSessionId()->getValue()); |
| producerId->setValue(this->producerIds.getNextSequenceId()); |
| |
| return producerId; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int ActiveMQSessionKernel::getHashCode() const { |
| return this->config->hashCode; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::checkMessageListener() const { |
| |
| this->config->consumerLock.readLock().lock(); |
| try { |
| Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); |
| while (iter->hasNext()) { |
| Pointer<ActiveMQConsumerKernel> consumer = iter->next(); |
| if (consumer->getMessageListener() != NULL) { |
| throw cms::IllegalStateException( |
| "Cannot synchronously receive a message when a MessageListener is set"); |
| } |
| } |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::sendAck(Pointer<MessageAck> ack, bool async) { |
| if (async || this->connection->isSendAcksAsync() || this->isTransacted()) { |
| this->connection->oneway(ack); |
| } else { |
| this->connection->syncRequest(ack); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQSessionKernel::isSessionAsyncDispatch() const { |
| return this->config->sessionAsyncDispatch; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQSessionKernel::setSessionAsyncDispatch(bool sessionAsyncDispatch) { |
| this->config->sessionAsyncDispatch = sessionAsyncDispatch; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| decaf::util::ArrayList< Pointer<ActiveMQConsumerKernel> > ActiveMQSessionKernel::getConsumers() const { |
| ArrayList< Pointer<ActiveMQConsumerKernel> > result; |
| this->config->consumerLock.readLock().lock(); |
| try { |
| result.addAll(this->config->consumers); |
| this->config->consumerLock.readLock().unlock(); |
| } catch (Exception& ex) { |
| this->config->consumerLock.readLock().unlock(); |
| throw; |
| } |
| |
| return result; |
| } |