blob: 4f593dbc86b572665296a9297b754ed4bedc1bfa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQTransaction.h"
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConsumer.h>
#include <activemq/core/ActiveMQMessage.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/Integer.h>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::exceptions;
using namespace activemq::connector;
using namespace decaf::lang::exceptions;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
ActiveMQTransaction::ActiveMQTransaction( ActiveMQConnection* connection,
ActiveMQSession* session,
const Properties& properties ) {
try {
if( connection == NULL || session == NULL ) {
throw NullPointerException(
__FILE__, __LINE__,
"ActiveMQTransaction::ActiveMQTransaction - "
"Initialized with a NULL connection data");
}
// Store State Data
this->connection = connection;
this->session = session;
// convert from property Strings to int.
maxRedeliveries = Integer::parseInt(
properties.getProperty( "transaction.maxRedeliveryCount", "5" ) );
// Start a new Transaction
transactionInfo = connection->getConnectionData()->
getConnector()->startTransaction( session->getSessionInfo() );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQTransaction::~ActiveMQTransaction() {
try{
// Inform the connector we are rolling back before we close so that
// the provider knows we didn't complete this transaction
connection->getConnectionData()->getConnector()->
rollback( transactionInfo, session->getSessionInfo() );
// Clean up
clearTransaction();
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::clearTransaction() {
try{
if( transactionInfo != NULL ) {
// Dispose of the ProducerInfo
transactionInfo->close();
delete transactionInfo;
}
synchronized( &rollbackLock ) {
// If there are any messages that are being transacted, then
// they die once and for all here.
RollbackMap::iterator itr = rollbackMap.begin();
for( ; itr != rollbackMap.end(); ++itr ) {
MessageList::iterator msgItr = itr->second.begin();
for( ; msgItr != itr->second.end(); ++msgItr ) {
delete *msgItr;
}
}
rollbackMap.clear();
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::addToTransaction( ActiveMQMessage* message,
ActiveMQConsumer* consumer ) {
synchronized( &rollbackLock ) {
// Store in the Multi Map
rollbackMap[consumer].push_back( message );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::addToTransaction( cms::Message* message,
ActiveMQConsumer* consumer ) {
synchronized( &rollbackLock ) {
ActiveMQMessage* coreMessage = dynamic_cast<ActiveMQMessage*>( message );
if( coreMessage == NULL ) {
throw NullPointerException(
__FILE__, __LINE__, "Message is not a core::ActiveMQMessage derivation" );
}
// Store in the Multi Map
rollbackMap[consumer].push_back( coreMessage );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::removeFromTransaction( ActiveMQConsumer* consumer ) {
try{
// Delete all the messages, then remove the consumer's entry from
// the Rollback Map.
synchronized( &rollbackLock ) {
RollbackMap::iterator rb_itr = rollbackMap.find( consumer );
if( rb_itr == rollbackMap.end() ) {
return;
}
MessageList::iterator itr = rb_itr->second.begin();
for( ; itr != rollbackMap[consumer].end(); ++itr ) {
delete *itr;
}
// Erase the entry from the map
rollbackMap.erase( consumer );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::removeFromTransaction( long long consumerId ) {
try {
// Delete all the messages, then remove the consumer's entry from
// the Rollback Map.
synchronized( &rollbackLock ) {
RollbackMap::iterator iter = rollbackMap.begin();
for( ; iter != rollbackMap.end(); ++iter ) {
long long id = iter->first->getConsumerInfo()->getConsumerId();
if( id == consumerId ) {
removeFromTransaction( iter->first );
return;
}
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::commit()
throw ( activemq::exceptions::ActiveMQException ) {
try{
if( this->transactionInfo == NULL ) {
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQTransaction::begin - "
"Commit called before transaction was started.");
}
// Stop any deliveries
session->stop();
// Now that the session is stopped, ack all messages we've
// delivered to the clients and placed in the Rollback map.
synchronized( &rollbackLock ) {
RollbackMap::iterator itr = rollbackMap.begin();
for(; itr != rollbackMap.end(); ++itr) {
ackMessages( itr->first, itr->second );
}
}
// Commit the current Transaction
connection->getConnectionData()->getConnector()->
commit( transactionInfo, session->getSessionInfo() );
// Clean out the Transaction
clearTransaction();
// Start a new Transaction
transactionInfo = connection->getConnectionData()->
getConnector()->startTransaction( session->getSessionInfo() );
// Stop any deliveries
session->start();
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::rollback()
throw ( activemq::exceptions::ActiveMQException ) {
try{
if( this->transactionInfo == NULL ) {
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQTransaction::rollback - "
"Rollback called before transaction was started.");
}
// Stop any Deliveries
session->stop();
// Rollback the Transaction
connection->getConnectionData()->getConnector()->
rollback( transactionInfo, session->getSessionInfo() );
// Dispose of the TransactionInfo
transactionInfo->close();
delete transactionInfo;
// Start a new Transaction
transactionInfo = connection->getConnectionData()->
getConnector()->startTransaction( session->getSessionInfo() );
// Start Deliveries
session->start();
// Roolback the messages to the Session, since we have the lock on the
// rollbackLock, then no message will added to the transaction until we
// are done processing all the messages that we to re-deliver and the map
// is cleared.
synchronized( &rollbackLock ) {
RollbackMap::iterator itr = rollbackMap.begin();
for(; itr != rollbackMap.end(); ++itr) {
redeliverMessages( itr->first, itr->second );
}
// Clear the map. Ownership of the messages is now handed off
// to the rollback tasks.
rollbackMap.clear();
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::redeliverMessages( ActiveMQConsumer* consumer,
MessageList& messages )
throw ( activemq::exceptions::ActiveMQException ) {
try {
MessageList::iterator itr = messages.begin();
for( ; itr != messages.end(); ++itr ) {
ActiveMQMessage* message = *itr;
message->setRedeliveryCount( message->getRedeliveryCount() + 1 );
if( message->getRedeliveryCount() >= maxRedeliveries ) {
// Poison Ack the Message, we give up processing this one
connection->getConnectionData()->getConnector()->
acknowledge(
session->getSessionInfo(),
consumer->getConsumerInfo(),
dynamic_cast<Message*>( message ),
Connector::ACK_TYPE_POISON );
// Won't be re-delivered so it must be destroyed here.
delete message;
continue;
}
DispatchData data( consumer->getConsumerInfo(), message );
session->dispatch( data );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::ackMessages( ActiveMQConsumer* consumer,
MessageList& messages )
throw ( activemq::exceptions::ActiveMQException ) {
try {
std::list<const cms::Message*> cmsMessages;
MessageList::iterator iter = messages.begin();
for( ; iter != messages.end(); ++iter ) {
cmsMessages.insert( cmsMessages.end(),
dynamic_cast<const cms::Message*>( *iter ) );
}
// Acknowledge the Messages let the connector do it in the most
// efficient manner it can for large message block acks.
connection->getConnectionData()->getConnector()->
acknowledge(
session->getSessionInfo(),
consumer->getConsumerInfo(),
cmsMessages,
Connector::ACK_TYPE_CONSUMED );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}