blob: e291b1d3c8bcf68848d68836e6bc9a0ae7b1e7a8 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2015 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/*
* CacheTransactionManager2PCImpl.cpp
*
* Created on: 13-Nov-2015
* Author: sshcherbakov
*/
#include "../gf_types.hpp"
#include "InternalCacheTransactionManager2PCImpl.hpp"
#include "CacheTransactionManagerImpl.hpp"
#include "../TransactionId.hpp"
#include "../ExceptionTypes.hpp"
#include "TcrMessage.hpp"
#include "ThinClientPoolDM.hpp"
#include "CacheRegionHelper.hpp"
#include "../PoolManager.hpp"
#include "TXCleaner.hpp"
namespace gemfire {
InternalCacheTransactionManager2PCImpl::InternalCacheTransactionManager2PCImpl(Cache* cache):
CacheTransactionManagerImpl(cache)
{
}
InternalCacheTransactionManager2PCImpl::~InternalCacheTransactionManager2PCImpl() {
}
void InternalCacheTransactionManager2PCImpl::prepare()
{
try
{
TSSTXStateWrapper* txStateWrapper = TSSTXStateWrapper::s_gemfireTSSTXState;
TXState* txState = txStateWrapper->getTXState();
if(txState == NULL)
{
GfErrTypeThrowException(
"Transaction is null, cannot prepare of a null transaction",
GF_CACHE_ILLEGAL_STATE_EXCEPTION);
}
ThinClientPoolDM* tcr_dm = getDM();
//This is for the case when no cache operation/s is performed between tx->begin() and tx->commit()/rollback(),
//simply return without sending COMMIT message to server. tcr_dm is NULL implies no cache operation is performed.
//Theres no need to call txCleaner.clean(); here, because TXCleaner destructor is called which cleans ThreadLocal.
if (tcr_dm == NULL) {
return;
}
TcrMessage requestCommitBefore(
TcrMessage::TX_SYNCHRONIZATION,
BEFORE_COMMIT,
txState->getTransactionId()->getId(),
STATUS_COMMITTED);
TcrMessage replyCommitBefore;
GfErrType err = tcr_dm->sendSyncRequest(requestCommitBefore, replyCommitBefore);
if(err != GF_NOERR)
{
GfErrTypeThrowException("Error while prepare", err);
}
else
{
switch (replyCommitBefore.getMessageType())
{
case TcrMessage::REPLY:
txState->setPrepared();
break;
case TcrMessage::EXCEPTION:
{
TXCleaner txCleaner(this);
const char* exceptionMsg = replyCommitBefore.getException();
err = ThinClientRegion::handleServerException(
"CacheTransactionManager::prepare",
exceptionMsg);
GfErrTypeThrowException("Commit Failed in prepare", err);
break;
}
case TcrMessage::REQUEST_DATA_ERROR:
{
TXCleaner txCleaner(this);
GfErrTypeThrowException("Commit Failed in prepare", GF_COMMIT_CONFLICT_EXCEPTION);
break;
}
default:
{
TXCleaner txCleaner(this);
LOGERROR("Unknown message type in prepare reply %d", replyCommitBefore.getMessageType());
GfErrTypeThrowException("Commit Failed in prepare", GF_MSG);
break;
}
}
}
}
catch (const Exception& ex)
{
LOGERROR("Unexpected exception during commit in prepare %s", ex.getMessage());
throw ex;
}
}
void InternalCacheTransactionManager2PCImpl::commit()
{
LOGFINEST("Committing");
this->afterCompletion(STATUS_COMMITTED);
}
void InternalCacheTransactionManager2PCImpl::rollback()
{
LOGFINEST("Rolling back");
this->afterCompletion(STATUS_ROLLEDBACK);
}
void InternalCacheTransactionManager2PCImpl::afterCompletion(int32_t status)
{
try {
TSSTXStateWrapper* txStateWrapper = TSSTXStateWrapper::s_gemfireTSSTXState;
TXState* txState = txStateWrapper->getTXState();
if(txState == NULL)
{
GfErrTypeThrowException(
"Transaction is null, cannot commit a null transaction",
GF_CACHE_ILLEGAL_STATE_EXCEPTION);
}
ThinClientPoolDM* tcr_dm = getDM();
//This is for the case when no cache operation/s is performed between tx->begin() and tx->commit()/rollback(),
//simply return without sending COMMIT message to server. tcr_dm is NULL implies no cache operation is performed.
//Theres no need to call txCleaner.clean(); here, because TXCleaner destructor is called which cleans ThreadLocal.
if (tcr_dm == NULL) {
return;
}
if( !txState->isPrepared() ) {
// Fallback to deafult 1PC commit
// The inherited 1PC implmentation clears the transaction state
switch( status ) {
case STATUS_COMMITTED:
CacheTransactionManagerImpl::commit();
break;
case STATUS_ROLLEDBACK:
CacheTransactionManagerImpl::rollback();
break;
default:
GfErrTypeThrowException("Unknown command", GF_CACHE_ILLEGAL_STATE_EXCEPTION);
}
return;
}
// In 2PC we always clear the transaction state
TXCleaner txCleaner(this);
TcrMessage requestCommitAfter(
TcrMessage::TX_SYNCHRONIZATION,
AFTER_COMMIT,
txState->getTransactionId()->getId(),
status);
TcrMessage replyCommitAfter;
GfErrType err = tcr_dm->sendSyncRequest(requestCommitAfter, replyCommitAfter);
if(err != GF_NOERR)
{
GfErrTypeThrowException("Error in 2PC commit", err);
}
else
{
switch (replyCommitAfter.getMessageType())
{
case TcrMessage::RESPONSE:
{
TXCommitMessagePtr commit = staticCast<TXCommitMessagePtr>(replyCommitAfter.getValue());
if( commit.ptr() != NULL ) // e.g. when afterCompletion(STATUS_ROLLEDBACK) called
{
txCleaner.clean();
commit->apply(this->getCache());
}
break;
}
case TcrMessage::EXCEPTION:
{
const char* exceptionMsg = replyCommitAfter.getException();
err = ThinClientRegion::handleServerException(
"CacheTransactionManager::afterCompletion",
exceptionMsg);
GfErrTypeThrowException("2PC Commit Failed", err);
break;
}
case TcrMessage::REQUEST_DATA_ERROR:
GfErrTypeThrowException("2PC Commit Failed", GF_COMMIT_CONFLICT_EXCEPTION);
break;
default:
GfErrTypeThrowException("2PC Commit Failed", GF_MSG);
break;
}
}
}
catch (const Exception& ex)
{
LOGERROR("Unexpected exception during completing transaction %s", ex.getMessage());
throw ex;
}
}
}