blob: 8aa76fa3e4b923aabd6f7e698e510b990749a64b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQTransaction.h"
#include <activemq/exceptions/NullPointerException.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConsumer.h>
#include <activemq/core/ActiveMQMessage.h>
#include <activemq/util/Integer.h>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::connector;
using namespace activemq::concurrent;
using namespace activemq::exceptions;
////////////////////////////////////////////////////////////////////////////////
ActiveMQTransaction::ActiveMQTransaction( ActiveMQConnection* connection,
ActiveMQSession* session,
const Properties& properties )
{
try
{
if( connection == NULL || session == NULL )
{
throw NullPointerException(
__FILE__, __LINE__,
"ActiveMQTransaction::ActiveMQTransaction - "
"Initialized with a NULL connection data");
}
// Store State Data
this->connection = connection;
this->session = session;
// convert from property Strings to int.
maxRedeliveries = Integer::parseInt(
properties.getProperty( "transaction.maxRedeliveryCount", "5" ) );
// Start a new Transaction
transactionInfo = connection->getConnectionData()->
getConnector()->startTransaction( session->getSessionInfo() );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQTransaction::~ActiveMQTransaction(void)
{
try
{
// Inform the connector we are rolling back before we close so that
// the provider knows we didn't complete this transaction
connection->getConnectionData()->getConnector()->
rollback( transactionInfo, session->getSessionInfo() );
// Clean up
clearTransaction();
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::clearTransaction(void)
{
try
{
if( transactionInfo != NULL )
{
// Dispose of the ProducerInfo
transactionInfo->close();
delete transactionInfo;
}
synchronized( &rollbackLock )
{
// If there are any messages that are being transacted, then
// they die once and for all here.
RollbackMap::iterator itr = rollbackMap.begin();
for( ; itr != rollbackMap.end(); ++itr )
{
MessageList::iterator msgItr = itr->second.begin();
for( ; msgItr != itr->second.end(); ++msgItr )
{
delete *msgItr;
}
}
rollbackMap.clear();
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::addToTransaction( ActiveMQMessage* message,
ActiveMQConsumer* consumer )
{
synchronized( &rollbackLock )
{
// Store in the Multi Map
rollbackMap[consumer].push_back( message );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::removeFromTransaction(
ActiveMQConsumer* consumer )
{
try
{
// Delete all the messages, then remove the consumer's entry from
// the Rollback Map.
synchronized( &rollbackLock )
{
RollbackMap::iterator rb_itr = rollbackMap.find( consumer );
if( rb_itr == rollbackMap.end() )
{
return;
}
MessageList::iterator itr = rb_itr->second.begin();
for( ; itr != rollbackMap[consumer].end(); ++itr )
{
delete *itr;
}
// Erase the entry from the map
rollbackMap.erase( consumer );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::removeFromTransaction( long long consumerId ) {
try {
// Delete all the messages, then remove the consumer's entry from
// the Rollback Map.
synchronized( &rollbackLock )
{
RollbackMap::iterator iter = rollbackMap.begin();
for( ; iter != rollbackMap.end(); ++iter ) {
long long id = iter->first->getConsumerInfo()->getConsumerId();
if( id == consumerId ) {
removeFromTransaction( iter->first );
return;
}
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::commit() throw ( exceptions::ActiveMQException )
{
try
{
if( this->transactionInfo == NULL )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQTransaction::begin - "
"Commit called before transaction was started.");
}
// Stop any deliveries
session->stop();
// Commit the current Transaction
connection->getConnectionData()->getConnector()->
commit( transactionInfo, session->getSessionInfo() );
// Clean out the Transaction
clearTransaction();
// Start a new Transaction
transactionInfo = connection->getConnectionData()->
getConnector()->startTransaction( session->getSessionInfo() );
// Stop any deliveries
session->start();
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::rollback() throw ( exceptions::ActiveMQException )
{
try
{
if( this->transactionInfo == NULL )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQTransaction::rollback - "
"Rollback called before transaction was started.");
}
// Stop any Deliveries
session->stop();
// Rollback the Transaction
connection->getConnectionData()->getConnector()->
rollback( transactionInfo, session->getSessionInfo() );
// Dispose of the TransactionInfo
transactionInfo->close();
delete transactionInfo;
// Start a new Transaction
transactionInfo = connection->getConnectionData()->
getConnector()->startTransaction( session->getSessionInfo() );
// Start Deliveries
session->start();
// Roolback the messages to the Session, since we have the lock on the
// rollbackLock, then no message will added to the transaction unitll we
// are done processing all the messages that we to redeliver and the map
// is cleared.
synchronized( &rollbackLock )
{
RollbackMap::iterator itr = rollbackMap.begin();
for(; itr != rollbackMap.end(); ++itr)
{
redeliverMessages( itr->first, itr->second );
}
// Clear the map. Ownership of the messages is now handed off
// to the rollback tasks.
rollbackMap.clear();
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::redeliverMessages( ActiveMQConsumer* consumer,
MessageList& messages )
throw ( exceptions::ActiveMQException ) {
try
{
MessageList::iterator itr = messages.begin();
for( ; itr != messages.end(); ++itr )
{
ActiveMQMessage* message = *itr;
message->setRedeliveryCount( message->getRedeliveryCount() + 1 );
if( message->getRedeliveryCount() >= maxRedeliveries )
{
// Poison Ack the Message, we give up processing this one
connection->getConnectionData()->getConnector()->
acknowledge(
session->getSessionInfo(),
consumer->getConsumerInfo(),
dynamic_cast< Message* >( message ),
Connector::ACK_TYPE_POISON );
// Won't redeliver this so we kill it here.
delete message;
return;
}
DispatchData data( consumer->getConsumerInfo(), message );
session->dispatch( data );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}