blob: bfd7338143151a81d999801c83b8ead491306f3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CmsTemplate.h"
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/exceptions/ExceptionDefines.h>
#include "ProducerCallback.h"
#include "MessageCreator.h"
#include <iostream>
using namespace activemq::cmsutil;
using namespace activemq::exceptions;
using namespace decaf::lang::exceptions;
using namespace std;
/**
* Macro for catching an exception then rethrowing an
* ActiveMQException (which is a cms::CMSException).
* @param type
* The type of the exception to throw
* @param t
* The instance of CmsTemplate
* (e.g. ActiveMQException ).
*/
#define CMSTEMPLATE_CATCH( type, t ) \
catch( type& ex ){ \
ex.setMark(__FILE__, __LINE__); \
try { \
t->destroy(); \
} catch( ... ) {} \
throw ActiveMQException(ex); \
}
/**
* A catch-all that throws an ActiveMQException.
* @param t
* The instance of CmsTemplate
*/
#define CMSTEMPLATE_CATCHALL(t) \
catch( ... ){ \
ActiveMQException ex( __FILE__, __LINE__, \
"caught unknown exception" ); \
try { \
t->destroy(); \
} catch( ... ) {} \
throw ex; \
}
////////////////////////////////////////////////////////////////////////////////
CmsTemplate::CmsTemplate() {
initDefaults();
}
////////////////////////////////////////////////////////////////////////////////
CmsTemplate::CmsTemplate(cms::ConnectionFactory* connectionFactory) {
initDefaults();
setConnectionFactory(connectionFactory);
}
////////////////////////////////////////////////////////////////////////////////
CmsTemplate::~CmsTemplate() {
try {
destroy();
} catch( ... ) { /* Absorb */ }
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::initDefaults() {
initialized = false;
defaultDestination = NULL;
defaultDestinationName = "";
messageIdEnabled = true;
messageTimestampEnabled = true;
noLocal = false;
receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
explicitQosEnabled = false;
deliveryMode = cms::DeliveryMode::PERSISTENT;
priority = DEFAULT_PRIORITY;
timeToLive = DEFAULT_TIME_TO_LIVE;
// Initialize the connection object.
connection = NULL;
// Initialize the session pools.
for( int ix=0; ix<NUM_SESSION_POOLS; ++ix) {
sessionPools[ix] = NULL;
}
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::createSessionPools() {
// Make sure they're destroyed first.
destroySessionPools();
/**
* Create the session pools.
*/
for( int ix=0; ix<NUM_SESSION_POOLS; ++ix) {
sessionPools[ix] = new SessionPool(connection,
(cms::Session::AcknowledgeMode)ix,
getResourceLifecycleManager());
}
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroySessionPools() {
/**
* Destroy the session pools.
*/
for( int ix=0; ix<NUM_SESSION_POOLS; ++ix) {
if( sessionPools[ix] != NULL ) {
delete sessionPools[ix];
sessionPools[ix] = NULL;
}
}
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::init() throw (cms::CMSException, IllegalStateException) {
try {
if( !initialized ) {
// Invoke the base class.
CmsDestinationAccessor::init();
initialized = true;
}
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroy() throw (cms::CMSException, IllegalStateException) {
try {
// Mark as not initialized.
initialized = false;
// Clear the connection reference
connection = NULL;
// Clear the reference to the default destination.
defaultDestination = NULL;
// Destroy the session pools.
destroySessionPools();
// Call the base class.
CmsDestinationAccessor::destroy();
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::checkDefaultDestination() throw (IllegalStateException) {
if (this->defaultDestination == NULL && this->defaultDestinationName.size()==0) {
throw IllegalStateException(
__FILE__, __LINE__,
"No defaultDestination or defaultDestinationName specified. Check configuration of CmsTemplate.");
}
}
////////////////////////////////////////////////////////////////////////////////
cms::Destination* CmsTemplate::resolveDefaultDestination(cms::Session* session)
throw (cms::CMSException) {
try {
// Make sure we have a default - otherwise throw.
checkDefaultDestination();
// First, check the destination object.
cms::Destination* dest = getDefaultDestination();
// If no default object was provided, the name was provided. Resolve
// the name and then set the destination object so we don't have to
// do this next time.
if( dest == NULL ) {
dest = resolveDestinationName(session, getDefaultDestinationName());
setDefaultDestination(dest);
}
return dest;
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Connection* CmsTemplate::getConnection()
throw (cms::CMSException) {
try {
// If we don't have a connection, create one.
if( connection == NULL ) {
// Invoke the base class to create the connection and add it
// to the resource lifecycle manager.
connection = createConnection();
// Start the connection.
connection->start();
// Create the session pools, passing in this connection.
createSessionPools();
}
return connection;
}
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
PooledSession* CmsTemplate::takeSession()
throw (cms::CMSException) {
try {
// Get the connection resource to verify that the connection and session
// pools have been allocated.
getConnection();
// Take a session from the pool.
return sessionPools[getSessionAcknowledgeMode()]->takeSession();
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::returnSession( PooledSession*& session )
throw (cms::CMSException) {
try {
if( session == NULL ) {
return;
}
// Close the session, but do not delete since it's a pooled session
session->close();
session = NULL;
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageProducer* CmsTemplate::createProducer(cms::Session* session,
cms::Destination* dest) throw (cms::CMSException) {
try {
// If no destination was provided, resolve the default.
if( dest == NULL ) {
dest = resolveDefaultDestination(session);
}
cms::MessageProducer* producer = NULL;
// Try to use a cached producer - requires that we're using a
// PooledSession
PooledSession* pooledSession = dynamic_cast<PooledSession*>(session);
if( pooledSession != NULL ) {
producer = pooledSession->createCachedProducer(dest);
} else {
producer = session->createProducer(dest);
}
// Set the default values on the producer.
producer->setDisableMessageID(!isMessageIdEnabled());
producer->setDisableMessageTimeStamp(!isMessageTimestampEnabled());
return producer;
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageConsumer* CmsTemplate::createConsumer(cms::Session* session,
cms::Destination* dest,
const std::string& selector,
bool noLocal ) throw (cms::CMSException) {
try {
// If no destination was provided, resolve the default.
if( dest == NULL ) {
dest = resolveDefaultDestination(session);
}
cms::MessageConsumer* consumer = NULL;
// Try to use a cached consumer - requires that we're using a
// PooledSession
PooledSession* pooledSession = dynamic_cast<PooledSession*>(session);
if( pooledSession != NULL ) {
consumer = pooledSession->createCachedConsumer(dest, selector, noLocal);
} else {
consumer = session->createConsumer(dest, selector, noLocal);
}
return consumer;
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroyProducer( cms::MessageProducer*& producer)
throw (cms::CMSException) {
if( producer == NULL ) {
return;
}
try {
// Close the producer, then destroy it.
producer->close();
}
AMQ_CATCH_NOTHROW( cms::CMSException )
// Destroy if it's not a cached producer.
CachedProducer* cachedProducer = dynamic_cast<CachedProducer*>(producer);
if( cachedProducer == NULL ) {
delete producer;
}
producer = NULL;
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroyConsumer( cms::MessageConsumer*& consumer)
throw (cms::CMSException) {
if( consumer == NULL ) {
return;
}
try {
// Close the producer, then destroy it.
consumer->close();
}
AMQ_CATCH_NOTHROW( cms::CMSException )
// Destroy if it's not a cached consumer.
CachedConsumer* cachedConsumer = dynamic_cast<CachedConsumer*>(consumer);
if( cachedConsumer == NULL ) {
delete consumer;
}
consumer = NULL;
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroyMessage( cms::Message*& message) {
if( message == NULL ) {
return;
}
// Destroy the message.
delete message;
message = NULL;
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::execute(SessionCallback* action) throw (cms::CMSException) {
PooledSession* pooledSession = NULL;
try {
if( action == NULL ) {
return;
}
// Verify that we are initialized
init();
// Take a session from the pool.
pooledSession = takeSession();
// Execute the action with the given session.
action->doInCms(pooledSession);
// Return the session to the pool.
returnSession(pooledSession);
}
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::execute(ProducerCallback* action) throw (cms::CMSException) {
try {
// Verify that we are initialized
init();
// Create the callback with using default destination.
ProducerExecutor cb(action, this, NULL);
// Execute the action in a session.
execute(&cb);
}
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::execute(cms::Destination* dest,
ProducerCallback* action) throw (cms::CMSException) {
try {
// Verify that we are initialized
init();
// Create the callback.
ProducerExecutor cb(action, this, dest);
// Execute the action in a session.
execute(&cb);
}
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::execute(const std::string& destinationName,
ProducerCallback* action) throw (cms::CMSException) {
try {
// Verify that we are initialized
init();
// Create the callback.
ResolveProducerExecutor cb(action, this, destinationName);
// Execute the action in a session.
execute(&cb);
}
CMSTEMPLATE_CATCH( IllegalStateException, this )
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::ProducerExecutor::doInCms( cms::Session* session )
throw (cms::CMSException) {
cms::MessageProducer* producer = NULL;
try {
if( session == NULL ) {
return;
}
// Create the producer.
producer = parent->createProducer(session, getDestination(session));
// Execute the action.
action->doInCms(session, producer);
// Destroy the producer.
parent->destroyProducer(producer);
}
CMSTEMPLATE_CATCH( ActiveMQException, parent )
CMSTEMPLATE_CATCHALL(parent)
}
////////////////////////////////////////////////////////////////////////////////
cms::Destination* CmsTemplate::ResolveProducerExecutor::getDestination(
cms::Session* session )
throw (cms::CMSException) {
try {
return parent->resolveDestinationName(session, destinationName);
}
CMSTEMPLATE_CATCH( ActiveMQException, parent )
CMSTEMPLATE_CATCH( IllegalStateException, parent )
CMSTEMPLATE_CATCHALL(parent)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::send(MessageCreator* messageCreator)
throw (cms::CMSException) {
try {
SendExecutor senderExecutor(messageCreator, this);
execute(&senderExecutor);
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::send(cms::Destination* dest,
MessageCreator* messageCreator)
throw (cms::CMSException) {
try {
SendExecutor senderExecutor(messageCreator, this);
execute(dest, &senderExecutor);
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::send(const std::string& destinationName,
MessageCreator* messageCreator)
throw (cms::CMSException) {
try {
SendExecutor senderExecutor(messageCreator, this);
execute(destinationName, &senderExecutor);
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::doSend(cms::Session* session, cms::MessageProducer* producer,
MessageCreator* messageCreator) throw (cms::CMSException) {
cms::Message* message = NULL;
try {
if( producer == NULL ) {
return;
}
// Create the message.
message = messageCreator->createMessage(session);
// Send the message.
if( isExplicitQosEnabled() ) {
producer->send(message, getDeliveryMode(), getPriority(), getTimeToLive());
} else {
producer->send(message);
}
// Destroy the resources.
destroyMessage(message);
} catch( ActiveMQException& e) {
e.setMark(__FILE__, __LINE__ );
// Destroy the resources.
destroyMessage(message);
throw e;
}
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::doReceive(cms::MessageConsumer* consumer )
throw (cms::CMSException) {
try {
if( consumer == NULL ) {
throw new ActiveMQException(__FILE__, __LINE__, "consumer is NULL");
}
long long receiveTime = getReceiveTimeout();
switch( receiveTime ) {
case RECEIVE_TIMEOUT_NO_WAIT: {
return consumer->receiveNoWait();
}
case RECEIVE_TIMEOUT_INDEFINITE_WAIT: {
return consumer->receive();
}
default: {
return consumer->receive((int)receiveTime);
}
}
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::ReceiveExecutor::doInCms(cms::Session* session)
throw (cms::CMSException) {
cms::MessageConsumer* consumer = NULL;
message = NULL;
try {
// Create the consumer resource.
consumer = parent->createConsumer(session, getDestination(session), selector, noLocal);
// Receive the message.
message = parent->doReceive(consumer);
// Destroy the consumer resource.
parent->destroyConsumer(consumer);
} catch( ActiveMQException& e) {
e.setMark(__FILE__, __LINE__ );
// Destroy the message resource.
parent->destroyMessage(message);
throw e;
}
}
////////////////////////////////////////////////////////////////////////////////
cms::Destination* CmsTemplate::ResolveReceiveExecutor::getDestination(
cms::Session* session )
throw (cms::CMSException) {
try {
return parent->resolveDestinationName(session, destinationName);
}
CMSTEMPLATE_CATCH( ActiveMQException, parent )
CMSTEMPLATE_CATCH( IllegalStateException, parent )
CMSTEMPLATE_CATCHALL(parent)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receive()
throw (cms::CMSException) {
try {
ReceiveExecutor receiveExecutor(this, NULL,
"", isNoLocal());
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receive(cms::Destination* destination )
throw (cms::CMSException) {
try {
ReceiveExecutor receiveExecutor(this, destination,
"", isNoLocal());
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receive(const std::string& destinationName)
throw (cms::CMSException) {
try {
ResolveReceiveExecutor receiveExecutor(this,
"", isNoLocal(),
destinationName);
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receiveSelected(const std::string& selector)
throw (cms::CMSException) {
try {
ReceiveExecutor receiveExecutor(this, NULL,
selector, isNoLocal());
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receiveSelected(cms::Destination* destination,
const std::string& selector)
throw (cms::CMSException) {
try {
ReceiveExecutor receiveExecutor(this, destination,
selector, isNoLocal());
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receiveSelected(const std::string& destinationName,
const std::string& selector)
throw (cms::CMSException) {
try {
ResolveReceiveExecutor receiveExecutor(this,
selector, isNoLocal(),
destinationName);
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCH( ActiveMQException, this )
CMSTEMPLATE_CATCHALL(this)
}