blob: dc0cdb990342ec693cd8728204d0f84688314cc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQSession.h"
#include <activemq/exceptions/InvalidStateException.h>
#include <activemq/exceptions/NullPointerException.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQTransaction.h>
#include <activemq/core/ActiveMQConsumer.h>
#include <activemq/core/ActiveMQMessage.h>
#include <activemq/core/ActiveMQProducer.h>
#include <activemq/core/ActiveMQSessionExecutor.h>
#include <activemq/util/Boolean.h>
#include <activemq/connector/TransactionInfo.h>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::connector;
using namespace activemq::exceptions;
////////////////////////////////////////////////////////////////////////////////
ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
const Properties& properties,
ActiveMQConnection* connection )
{
if( sessionInfo == NULL || connection == NULL )
{
throw NullPointerException(
__FILE__, __LINE__,
"ActiveMQSession::ActiveMQSession - Init with NULL data");
}
this->sessionInfo = sessionInfo;
this->transaction = NULL;
this->connection = connection;
this->closed = false;
// Create a Transaction object only if the session is transactional
if( isTransacted() )
{
transaction =
new ActiveMQTransaction(connection, this, properties );
}
// Create the session executor object.
executor = new ActiveMQSessionExecutor( this );
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQSession::~ActiveMQSession()
{
try
{
// Destroy this session's resources
close();
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::fire( exceptions::ActiveMQException& ex ) {
if( connection != NULL ) {
connection->fire( ex );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::close() throw ( cms::CMSException )
{
// If we're already close, just get outta' here.
if( closed ) {
return;
}
try
{
// Stop the dispatch executor.
stop();
// Get the complete list of closeable session resources.
// Get the complete list of closeable session resources.
synchronized( &closableSessionResources ) {
Iterator<cms::Closeable*>* iter = closableSessionResources.iterator();
while( iter->hasNext() ) {
cms::Closeable* resource = iter->next();
try{
resource->close();
} catch( cms::CMSException& ex ){
/* Absorb */
}
}
delete iter;
}
// Destroy the Transaction
if( transaction != NULL ){
delete transaction;
transaction = NULL;
}
// Remove this sessions from the connector
connection->removeSession( this );
delete sessionInfo;
sessionInfo = NULL;
// Now indicate that this session is closed.
closed = true;
delete executor;
executor = NULL;
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::commit() throw ( cms::CMSException )
{
try
{
if( closed || !isTransacted() )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::commit - This Session Can't Commit");
}
// Commit the Transaction
transaction->commit();
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::rollback() throw ( cms::CMSException )
{
try
{
if( closed || !isTransacted() )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::rollback - This Session Can't Rollback" );
}
// Rollback the Transaction
transaction->rollback();
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createConsumer - Session Already Closed" );
}
return createConsumer( destination, "", false );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination,
const std::string& selector )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createConsumer - Session Already Closed" );
}
return createConsumer( destination, selector, false );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination,
const std::string& selector,
bool noLocal )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createConsumer - Session Already Closed" );
}
ConsumerInfo* consumerInfo =
connection->getConnectionData()->getConnector()->
createConsumer( destination,
sessionInfo,
selector,
noLocal );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( consumerInfo ) );
// Create the consumer instance.
ActiveMQConsumer* consumer = new ActiveMQConsumer(
consumerInfo, this );
// Add the consumer to the map.
synchronized( &consumers ) {
consumers.setValue( consumerInfo->getConsumerId(), consumer );
}
// Register this as a message dispatcher for the consumer.
connection->addDispatcher( consumerInfo, this );
// Start the Consumer, we are now ready to receive messages
try{
connection->getConnectionData()->getConnector()->startConsumer(
consumerInfo );
} catch( ActiveMQException& ex ) {
synchronized( &consumers ) {
consumers.remove( consumerInfo->getConsumerId() );
}
delete consumer;
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
return consumer;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
const cms::Topic* destination,
const std::string& name,
const std::string& selector,
bool noLocal )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createDurableConsumer - Session Already Closed" );
}
ConsumerInfo* consumerInfo =
connection->getConnectionData()->getConnector()->
createDurableConsumer(
destination, sessionInfo, name, selector, noLocal );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( consumerInfo ) );
// Create the consumer instance.
ActiveMQConsumer* consumer = new ActiveMQConsumer(
consumerInfo, this );
// Add the consumer to the map.
synchronized( &consumers ) {
consumers.setValue( consumerInfo->getConsumerId(), consumer );
}
// Register this as a message dispatcher for the consumer.
connection->addDispatcher( consumerInfo, this );
// Start the Consumer, we are now ready to receive messages
try{
connection->getConnectionData()->getConnector()->startConsumer(
consumerInfo );
} catch( ActiveMQException& ex ) {
synchronized( &consumers ) {
consumers.remove( consumerInfo->getConsumerId() );
}
delete consumer;
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
return consumer;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageProducer* ActiveMQSession::createProducer(
const cms::Destination* destination )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createProducer - Session Already Closed" );
}
ProducerInfo* producerInfo =
connection->getConnectionData()->getConnector()->
createProducer( destination, sessionInfo );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( producerInfo ) );
// Create the producer instance.
ActiveMQProducer* producer = new ActiveMQProducer(
producerInfo, this );
return producer;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::Queue* ActiveMQSession::createQueue( const std::string& queueName )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createQueue - Session Already Closed" );
}
cms::Queue* queue = connection->getConnectionData()->
getConnector()->createQueue( queueName, sessionInfo );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( queue ) );
return queue;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::Topic* ActiveMQSession::createTopic( const std::string& topicName )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createTopic - Session Already Closed");
}
cms::Topic* topic = connection->getConnectionData()->
getConnector()->createTopic( topicName, sessionInfo );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( topic ) );
return topic;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue()
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createTemporaryQueue - "
"Session Already Closed" );
}
// Create the consumer instance.
cms::TemporaryQueue* queue =
connection->getConnectionData()->
getConnector()->createTemporaryQueue( sessionInfo );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( queue ) );
return queue;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic()
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createTemporaryTopic - "
"Session Already Closed" );
}
// Create the consumer instance.
cms::TemporaryTopic* topic =
connection->getConnectionData()->
getConnector()->createTemporaryTopic( sessionInfo );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( topic ) );
return topic;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQSession::createMessage()
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createMessage - Session Already Closed" );
}
cms::Message* message = connection->getConnectionData()->
getConnector()->createMessage( sessionInfo, transaction );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( message ) );
return message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::BytesMessage* ActiveMQSession::createBytesMessage()
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createBytesMessage - Session Already Closed" );
}
cms::BytesMessage* message = connection->getConnectionData()->
getConnector()->createBytesMessage( sessionInfo, transaction );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( message ) );
return message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::BytesMessage* ActiveMQSession::createBytesMessage(
const unsigned char* bytes,
std::size_t bytesSize )
throw ( cms::CMSException )
{
try
{
BytesMessage* msg = createBytesMessage();
msg->setBodyBytes( bytes, bytesSize );
return msg;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::TextMessage* ActiveMQSession::createTextMessage()
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createTextMessage - Session Already Closed" );
}
cms::TextMessage* message = connection->getConnectionData()->
getConnector()->createTextMessage( sessionInfo, transaction );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( message ) );
return message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::TextMessage* ActiveMQSession::createTextMessage( const std::string& text )
throw ( cms::CMSException )
{
try
{
TextMessage* msg = createTextMessage();
msg->setText( text.c_str() );
return msg;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::MapMessage* ActiveMQSession::createMapMessage()
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createMapMessage - Session Already Closed" );
}
cms::MapMessage* message = connection->getConnectionData()->
getConnector()->createMapMessage( sessionInfo, transaction );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( message ) );
return message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode() const
{
return sessionInfo != NULL ?
sessionInfo->getAckMode() : Session::AUTO_ACKNOWLEDGE;
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQSession::isTransacted() const
{
return sessionInfo != NULL ?
sessionInfo->getAckMode() == Session::SESSION_TRANSACTED : false;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::acknowledge( ActiveMQConsumer* consumer,
ActiveMQMessage* message )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::acknowledgeMessage - Session Already Closed" );
}
// Stores the Message and its consumer in the tranasction, if the
// session is a transactional one.
if( isTransacted() )
{
transaction->addToTransaction( message, consumer );
}
// Delegate to connector to ack this message.
return connection->getConnectionData()->
getConnector()->acknowledge(
sessionInfo,
consumer->getConsumerInfo(),
dynamic_cast< cms::Message* >( message ) );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::send( cms::Message* message, ActiveMQProducer* producer )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::onProducerClose - Session Already Closed" );
}
// Send via the connection syncrhronously.
connection->getConnectionData()->
getConnector()->send( message, producer->getProducerInfo() );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::onConnectorResourceClosed(
const ConnectorResource* resource ) throw ( cms::CMSException ) {
try{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::onProducerClose - Session Already Closed");
}
const ConsumerInfo* consumer =
dynamic_cast<const ConsumerInfo*>( resource );
if( consumer != NULL )
{
// If the executor thread is currently running, stop it.
bool wasStarted = isStarted();
if( wasStarted ) {
stop();
}
// Remove the dispatcher for the Connection
connection->removeDispatcher( consumer );
// Remove this consumer from the Transaction if we are
// transactional
if( transaction != NULL ) {
transaction->removeFromTransaction(
consumer->getConsumerId() );
}
ActiveMQConsumer* obj = NULL;
synchronized( &consumers ) {
if( consumers.containsKey( consumer->getConsumerId() ) ) {
// Get the consumer reference
obj = consumers.getValue( consumer->getConsumerId() );
// Remove this consumer from the map.
consumers.remove( consumer->getConsumerId() );
}
}
// Clean up any resources in the executor for this consumer
if( obj != NULL && executor != NULL ) {
// Purge any pending messages for this consumer.
vector<ActiveMQMessage*> messages =
executor->purgeConsumerMessages(obj);
// Destroy the messages.
for( unsigned int ix=0; ix<messages.size(); ++ix ) {
delete messages[ix];
}
}
// If the executor thread was previously running, start it back
// up.
if( wasStarted ) {
start();
}
}
// Remove the entry from the session resource map if it's there
const cms::Closeable* closeable =
dynamic_cast<const cms::Closeable*>( resource );
if( closeable != NULL ){
synchronized( &closableSessionResources ) {
closableSessionResources.remove(
const_cast<cms::Closeable*>( closeable ) );
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::ExceptionListener* ActiveMQSession::getExceptionListener()
{
if( connection != NULL )
{
return connection->getExceptionListener();
}
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::unsubscribe( const std::string& name )
throw ( CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQSession::createConsumer - Session Already Closed" );
}
// Delegate to the connector.
connection->getConnectionData()->getConnector()->unsubscribe( name );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::checkConnectorResource(
connector::ConnectorResource* resource ) {
if( resource == NULL ) {
return;
}
// Add the consumer to the map of closeable session resources.
synchronized( &closableSessionResources ) {
closableSessionResources.add( resource );
}
// Register as a Listener
resource->addListener( this );
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::dispatch( DispatchData& message ) {
if( executor != NULL ) {
executor->execute( message );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::redispatch( util::Queue<DispatchData>& unconsumedMessages )
{
util::Queue<DispatchData> reversedList;
// Copy the list in reverse order then clear the original list.
synchronized( &unconsumedMessages ) {
unconsumedMessages.reverse( reversedList );
unconsumedMessages.clear();
}
// Add the list to the front of the executor.
while( !reversedList.empty() ) {
DispatchData data = reversedList.pop();
executor->executeFirst( data );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::start()
{
if( executor != NULL ) {
executor->start();
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::stop()
{
if( executor != NULL ) {
executor->stop();
}
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQSession::isStarted() const
{
if( executor == NULL ) {
return false;
}
return executor->isStarted();
}