blob: bff8e3cef14d42654ec5b8f48452f126eb27c39e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CmsTemplate.h"
#include "ProducerCallback.h"
#include "MessageCreator.h"
#include <iostream>
#include <cms/IllegalStateException.h>
#include <cms/CMSException.h>
using namespace cms;
using namespace activemq::cmsutil;
using namespace std;
/**
* Macro for catching an exception then rethrowing an
* cms::CMSException.
* @param type
* The type of the exception to throw
* @param t
* The instance of CmsTemplate
* (e.g. ActiveMQException ).
*/
#define CMSTEMPLATE_CATCH( type, t ) \
catch( type& ex ){ \
ex.setMark(__FILE__, __LINE__); \
try { \
t->destroy(); \
} catch(...) {} \
throw CMSException(ex.what(), NULL); \
}
/**
* A catch-all that throws an CMSException.
* @param t
* The instance of CmsTemplate
*/
#define CMSTEMPLATE_CATCHALL(t) \
catch( cms::CMSException& ex ){ \
try { \
t->destroy(); \
} catch( ... ) {} \
throw; \
} catch(...){ \
try { \
t->destroy(); \
} catch( ... ) {} \
throw CMSException("caught unknown exception", NULL); \
}
#define CMSTEMPLATE_CATCHALL_NOTHROW( ) \
catch(...){ \
}
////////////////////////////////////////////////////////////////////////////////
const long long CmsTemplate::RECEIVE_TIMEOUT_NO_WAIT = -1;
const long long CmsTemplate::RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0;
const int CmsTemplate::DEFAULT_PRIORITY = 4;
const long long CmsTemplate::DEFAULT_TIME_TO_LIVE = 0;
////////////////////////////////////////////////////////////////////////////////
CmsTemplate::CmsTemplate() : CmsDestinationAccessor(),
connection(NULL),
sessionPools(),
defaultDestination(NULL),
defaultDestinationName(""),
messageIdEnabled(false),
messageTimestampEnabled(false),
noLocal(false),
receiveTimeout(0),
explicitQosEnabled(false),
deliveryMode(0),
priority(0),
timeToLive(0),
initialized(false) {
initDefaults();
}
////////////////////////////////////////////////////////////////////////////////
CmsTemplate::CmsTemplate( cms::ConnectionFactory* connectionFactory ) : CmsDestinationAccessor(),
connection(NULL),
sessionPools(),
defaultDestination(NULL),
defaultDestinationName(""),
messageIdEnabled(false),
messageTimestampEnabled(false),
noLocal(false),
receiveTimeout(0),
explicitQosEnabled(false),
deliveryMode(0),
priority(0),
timeToLive(0),
initialized(false) {
initDefaults();
setConnectionFactory(connectionFactory);
}
////////////////////////////////////////////////////////////////////////////////
CmsTemplate::~CmsTemplate() {
try {
destroy();
} catch( ... ) { /* Absorb */ }
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::initDefaults() {
initialized = false;
defaultDestination = NULL;
defaultDestinationName = "";
messageIdEnabled = true;
messageTimestampEnabled = true;
noLocal = false;
receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
explicitQosEnabled = false;
deliveryMode = cms::DeliveryMode::PERSISTENT;
priority = DEFAULT_PRIORITY;
timeToLive = DEFAULT_TIME_TO_LIVE;
// Initialize the connection object.
connection = NULL;
// Initialize the session pools.
for( int ix=0; ix<NUM_SESSION_POOLS; ++ix ) {
sessionPools[ix] = NULL;
}
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::createSessionPools() {
// Make sure they're destroyed first.
destroySessionPools();
/**
* Create the session pools.
*/
for (int ix = 0; ix < NUM_SESSION_POOLS; ++ix) {
sessionPools[ix] = new SessionPool(connection, (cms::Session::AcknowledgeMode) ix, getResourceLifecycleManager());
}
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroySessionPools() {
/**
* Destroy the session pools.
*/
for (int ix = 0; ix < NUM_SESSION_POOLS; ++ix) {
if (sessionPools[ix] != NULL) {
delete sessionPools[ix];
sessionPools[ix] = NULL;
}
}
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::init() {
try {
if (!initialized) {
// Invoke the base class.
CmsDestinationAccessor::init();
initialized = true;
}
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroy() {
try {
// Mark as not initialized.
initialized = false;
// Clear the connection reference
connection = NULL;
// Clear the reference to the default destination.
defaultDestination = NULL;
// Destroy the session pools.
destroySessionPools();
// Call the base class.
CmsDestinationAccessor::destroy();
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::checkDefaultDestination() {
if (this->defaultDestination == NULL && this->defaultDestinationName.size() == 0) {
throw IllegalStateException("No defaultDestination or defaultDestinationName specified."
"Check configuration of CmsTemplate.", NULL);
}
}
////////////////////////////////////////////////////////////////////////////////
cms::Destination* CmsTemplate::resolveDefaultDestination(cms::Session* session) {
try {
// Make sure we have a default - otherwise throw.
checkDefaultDestination();
// First, check the destination object.
cms::Destination* dest = getDefaultDestination();
// If no default object was provided, the name was provided. Resolve
// the name and then set the destination object so we don't have to
// do this next time.
if (dest == NULL) {
dest = resolveDestinationName(session, getDefaultDestinationName());
setDefaultDestination(dest);
}
return dest;
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Connection* CmsTemplate::getConnection() {
try {
// If we don't have a connection, create one.
if (connection == NULL) {
// Invoke the base class to create the connection and add it
// to the resource lifecycle manager.
connection = createConnection();
// Start the connection.
connection->start();
// Create the session pools, passing in this connection.
createSessionPools();
}
return connection;
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
PooledSession* CmsTemplate::takeSession() {
try {
// Get the connection resource to verify that the connection and session
// pools have been allocated.
getConnection();
// Take a session from the pool.
return sessionPools[getSessionAcknowledgeMode()]->takeSession();
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::returnSession(PooledSession*& session) {
try {
if (session == NULL) {
return;
}
// Close the session, but do not delete since it's a pooled session
session->close();
session = NULL;
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageProducer* CmsTemplate::createProducer(cms::Session* session, cms::Destination* dest) {
try {
// If no destination was provided, resolve the default.
if (dest == NULL) {
dest = resolveDefaultDestination(session);
}
cms::MessageProducer* producer = NULL;
// Try to use a cached producer - requires that we're using a
// PooledSession
PooledSession* pooledSession = dynamic_cast<PooledSession*>(session);
if (pooledSession != NULL) {
producer = pooledSession->createCachedProducer(dest);
} else {
producer = session->createProducer(dest);
}
// Set the default values on the producer.
producer->setDisableMessageID(!isMessageIdEnabled());
producer->setDisableMessageTimeStamp(!isMessageTimestampEnabled());
return producer;
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageConsumer* CmsTemplate::createConsumer(cms::Session* session, cms::Destination* dest, const std::string& selector, bool noLocal) {
try {
// If no destination was provided, resolve the default.
if (dest == NULL) {
dest = resolveDefaultDestination(session);
}
cms::MessageConsumer* consumer = NULL;
// Try to use a cached consumer - requires that we're using a
// PooledSession
PooledSession* pooledSession = dynamic_cast<PooledSession*>(session);
if (pooledSession != NULL) {
consumer = pooledSession->createCachedConsumer(dest, selector, noLocal);
} else {
consumer = session->createConsumer(dest, selector, noLocal);
}
return consumer;
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroyProducer(cms::MessageProducer*& producer) {
if (producer == NULL) {
return;
}
try {
// Close the producer, then destroy it.
producer->close();
}
CMSTEMPLATE_CATCHALL_NOTHROW()
// Destroy if it's not a cached producer.
CachedProducer* cachedProducer = dynamic_cast<CachedProducer*>(producer);
if (cachedProducer == NULL) {
delete producer;
}
producer = NULL;
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroyConsumer(cms::MessageConsumer*& consumer) {
if (consumer == NULL) {
return;
}
try {
// Close the producer, then destroy it.
consumer->close();
}
CMSTEMPLATE_CATCHALL_NOTHROW()
// Destroy if it's not a cached consumer.
CachedConsumer* cachedConsumer = dynamic_cast<CachedConsumer*>(consumer);
if (cachedConsumer == NULL) {
delete consumer;
}
consumer = NULL;
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroyMessage(cms::Message*& message) {
if (message == NULL) {
return;
}
// Destroy the message.
delete message;
message = NULL;
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::execute(SessionCallback* action) {
PooledSession* pooledSession = NULL;
try {
if (action == NULL) {
return;
}
// Verify that we are initialized
init();
// Take a session from the pool.
pooledSession = takeSession();
// Execute the action with the given session.
action->doInCms(pooledSession);
// Return the session to the pool.
returnSession(pooledSession);
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::execute(ProducerCallback* action) {
try {
// Verify that we are initialized
init();
// Create the callback with using default destination.
ProducerExecutor cb(action, this, NULL);
// Execute the action in a session.
execute(&cb);
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::execute(cms::Destination* dest, ProducerCallback* action) {
try {
// Verify that we are initialized
init();
// Create the callback.
ProducerExecutor cb(action, this, dest);
// Execute the action in a session.
execute(&cb);
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::execute(const std::string& destinationName, ProducerCallback* action) {
try {
// Verify that we are initialized
init();
// Create the callback.
ResolveProducerExecutor cb(action, this, destinationName);
// Execute the action in a session.
execute(&cb);
}
CMSTEMPLATE_CATCH(IllegalStateException, this)
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::ProducerExecutor::doInCms(cms::Session* session) {
cms::MessageProducer* producer = NULL;
try {
if (session == NULL) {
return;
}
// Create the producer.
producer = parent->createProducer(session, getDestination(session));
// Execute the action.
action->doInCms(session, producer);
// Destroy the producer.
parent->destroyProducer(producer);
}
CMSTEMPLATE_CATCHALL(parent)
}
////////////////////////////////////////////////////////////////////////////////
cms::Destination* CmsTemplate::ResolveProducerExecutor::getDestination(cms::Session* session) {
try {
return parent->resolveDestinationName(session, destinationName);
}
CMSTEMPLATE_CATCH(IllegalStateException, parent)
CMSTEMPLATE_CATCHALL(parent)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::send(MessageCreator* messageCreator) {
try {
SendExecutor senderExecutor(messageCreator, this);
execute(&senderExecutor);
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::send(cms::Destination* dest, MessageCreator* messageCreator) {
try {
SendExecutor senderExecutor(messageCreator, this);
execute(dest, &senderExecutor);
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::send(const std::string& destinationName, MessageCreator* messageCreator) {
try {
SendExecutor senderExecutor(messageCreator, this);
execute(destinationName, &senderExecutor);
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::doSend(cms::Session* session, cms::MessageProducer* producer, MessageCreator* messageCreator) {
cms::Message* message = NULL;
try {
if (producer == NULL) {
return;
}
// Create the message.
message = messageCreator->createMessage(session);
// Send the message.
if (isExplicitQosEnabled()) {
producer->send(message, getDeliveryMode(), getPriority(), getTimeToLive());
} else {
producer->send(message);
}
// Destroy the resources.
destroyMessage(message);
} catch (CMSException& e) {
e.setMark(__FILE__, __LINE__);
// Destroy the resources.
destroyMessage(message);
throw;
}
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::doReceive(cms::MessageConsumer* consumer) {
try {
if (consumer == NULL) {
throw CMSException("consumer is NULL", NULL);
}
long long receiveTime = getReceiveTimeout();
switch (receiveTime) {
case RECEIVE_TIMEOUT_NO_WAIT: {
return consumer->receiveNoWait();
}
case RECEIVE_TIMEOUT_INDEFINITE_WAIT: {
return consumer->receive();
}
default: {
return consumer->receive((int) receiveTime);
}
}
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::ReceiveExecutor::doInCms(cms::Session* session) {
cms::MessageConsumer* consumer = NULL;
message = NULL;
try {
if (session == NULL) {
return;
}
// Create the consumer resource.
consumer = parent->createConsumer(session, getDestination(session), selector, noLocal);
// Receive the message.
message = parent->doReceive(consumer);
// Destroy the consumer resource.
parent->destroyConsumer(consumer);
} catch (CMSException& e) {
e.setMark(__FILE__, __LINE__);
// Destroy the message resource.
parent->destroyMessage(message);
throw;
}
}
////////////////////////////////////////////////////////////////////////////////
cms::Destination* CmsTemplate::ResolveReceiveExecutor::getDestination(cms::Session* session) {
try {
return parent->resolveDestinationName(session, destinationName);
}
CMSTEMPLATE_CATCH(IllegalStateException, parent)
CMSTEMPLATE_CATCHALL(parent)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receive() {
try {
ReceiveExecutor receiveExecutor(this, NULL, "", isNoLocal());
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receive(cms::Destination* destination) {
try {
ReceiveExecutor receiveExecutor(this, destination, "", isNoLocal());
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receive(const std::string& destinationName) {
try {
ResolveReceiveExecutor receiveExecutor(this, "", isNoLocal(), destinationName);
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receiveSelected(const std::string& selector) {
try {
ReceiveExecutor receiveExecutor(this, NULL, selector, isNoLocal());
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receiveSelected(cms::Destination* destination, const std::string& selector) {
try {
ReceiveExecutor receiveExecutor(this, destination, selector, isNoLocal());
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCHALL(this)
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* CmsTemplate::receiveSelected(const std::string& destinationName, const std::string& selector) {
try {
ResolveReceiveExecutor receiveExecutor(this, selector, isNoLocal(), destinationName);
execute(&receiveExecutor);
return receiveExecutor.getMessage();
}
CMSTEMPLATE_CATCHALL(this)
}