blob: 15291b0da595d2d6a4421d414963da82ef841450 [file] [log] [blame]
package com.gemstone.gemfire.internal.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import com.gemstone.gemfire.cache.CommitConflictException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.TransactionInDoubtException;
import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.DistTXPrecommitMessage.DistTxPrecommitResponse;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllEntryData;
import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation.RemoveAllEntryData;
import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.tx.DistClientTXStateStub;
import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/**
* A map of distributed system member to either {@link DistPeerTXStateStub} or
* {@link DistTXStateOnCoordinator} (in case of TX coordinator is also a data
* node)
*/
protected HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals = new HashMap<>();
private HashMap<LocalRegion, DistributedMember> rrTargets;
private Set<DistributedMember> txRemoteParticpants = null; // other than local
protected HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null;
public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id,
InternalDistributedMember clientMember) {
super(managerImpl, id, clientMember);
}
public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id,
boolean isjta) {
super(managerImpl, id, isjta);
}
/*
* (non-Javadoc)
*
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit()
*
* [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure
* these messages reach all
*/
@Override
public void commit() throws CommitConflictException {
boolean preserveTx = false;
boolean precommitResult = false;
try {
// create a map of secondary(for PR) / replica(for RR) to stubs to send
// commit message to those
HashMap<DistributedMember, DistTXCoordinatorInterface> otherTargets2realDeals = getSecondariesAndReplicasForTxOps();
// add it to the existing map and then send commit to all copies
target2realDeals.putAll(otherTargets2realDeals);
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.commit target2realDeals = "
+ target2realDeals);
}
precommitResult = doPrecommit();
if (precommitResult) {
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.commit Going for commit ");
}
boolean phase2commitDone = doCommit();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.commit Commit "
+ (phase2commitDone ? "Done" : "Failed"));
}
// [DISTTX] TODO Handle this exception well
if (!phase2commitDone) {
throw new TransactionInDoubtException(
LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER
.toLocalizedString());
}
} else {
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.commit precommitResult = "
+ precommitResult);
}
}
} catch (CommitConflictException e) {
throw e;
} catch (UnsupportedOperationInTransactionException e) {
// fix for #42490
preserveTx = true;
throw e;
} finally {
// [DISTTX] TODO What about rollback exceptions?
if (!precommitResult) {
rollback();
}
inProgress = preserveTx;
if (this.synchRunnable != null) {
this.synchRunnable.abort();
}
}
}
/**
* creates a map of all secondaries(for PR) / replicas(for RR) to stubs to
* send commit message to those
*/
private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() {
final GemFireCacheImpl cache = GemFireCacheImpl
.getExisting("getSecondariesAndReplicasForTxOps");
InternalDistributedMember currentNode = cache.getDistributedSystem()
.getDistributedMember();
HashMap<DistributedMember, DistTXCoordinatorInterface> secondaryTarget2realDeals = new HashMap<>();
for (Entry<DistributedMember, DistTXCoordinatorInterface> e : target2realDeals
.entrySet()) {
DistributedMember originalTarget = e.getKey();
DistTXCoordinatorInterface distPeerTxStateStub = e.getValue();
ArrayList<DistTxEntryEvent> primaryTxOps = distPeerTxStateStub
.getPrimaryTransactionalOperations();
for (DistTxEntryEvent dtop : primaryTxOps) {
LocalRegion lr = dtop.getRegion();
// replicas or secondaries
Set<InternalDistributedMember> otherNodes = null;
if (lr instanceof PartitionedRegion) {
Set<InternalDistributedMember> allNodes = ((PartitionedRegion) dtop
.getRegion()).getRegionAdvisor().getBucketOwners(
dtop.getKeyInfo().getBucketId());
allNodes.remove(originalTarget);
otherNodes = allNodes;
} else if (lr instanceof DistributedRegion) {
otherNodes = ((DistributedRegion) lr).getCacheDistributionAdvisor()
.adviseInitializedReplicates();
otherNodes.remove(originalTarget);
}
if (otherNodes != null) {
for (InternalDistributedMember dm : otherNodes) {
// whether the target already exists due to other Tx op on the node
DistTXCoordinatorInterface existingDistPeerTXStateStub = target2realDeals
.get(dm);
if (existingDistPeerTXStateStub == null) {
existingDistPeerTXStateStub = secondaryTarget2realDeals.get(dm);
if (existingDistPeerTXStateStub == null) {
DistTXCoordinatorInterface newTxStub = null;
if (currentNode.equals(dm)) {
// [DISTTX] TODO add a test case for this condition?
newTxStub = new DistTXStateOnCoordinator(this, false);
} else {
newTxStub = new DistPeerTXStateStub(this, dm,
onBehalfOfClientMember);
}
newTxStub.addSecondaryTransactionalOperations(dtop);
secondaryTarget2realDeals.put(dm, newTxStub);
} else {
existingDistPeerTXStateStub
.addSecondaryTransactionalOperations(dtop);
}
} else {
existingDistPeerTXStateStub
.addSecondaryTransactionalOperations(dtop);
}
}
}
}
}
return secondaryTarget2realDeals;
}
@Override
public void rollback() {
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.rollback Going for rollback ");
}
boolean finalResult = false;
final GemFireCacheImpl cache = GemFireCacheImpl
.getExisting("Applying Dist TX Rollback");
final DM dm = cache.getDistributionManager();
try {
// Create Tx Participants
Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
// create processor and rollback message
DistTXRollbackMessage.DistTxRollbackReplyProcessor processor = new DistTXRollbackMessage.DistTxRollbackReplyProcessor(
this.getTxId(), dm, txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXRollbackMessage rollbackMsg = new DistTXRollbackMessage(
this.getTxId(), this.onBehalfOfClientMember, processor);
// send rollback message to remote nodes
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals
.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
"DistPeerTXStateStub", remoteTXStateStub.getClass()
.getSimpleName()));
}
try {
remoteTXStateStub.setRollbackMessage(rollbackMsg, dm);
remoteTXStateStub.rollback();
} finally {
remoteTXStateStub.setRollbackMessage(null, null);
remoteTXStateStub.finalCleanup();
}
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = "
+ remoteNode);
}
}
// Do rollback on local node
DistTXCoordinatorInterface localTXState = target2realDeals
.get(dm.getId());
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
"DistTXStateOnCoordinator", localTXState.getClass()
.getSimpleName()));
}
localTXState.rollback();
boolean localResult = localTXState.getRollbackResponse();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.rollback local = "
+ dm.getId() + " ,result= " + localResult + " ,finalResult-old= "
+ finalResult);
}
finalResult = finalResult && localResult;
}
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualSend != null) {
// internalAfterIndividualSend.run();
// }
/*
* [DISTTX] TODO see how to handle exception
*/
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualCommitProcess != null) {
// // Testing callback
// internalAfterIndividualCommitProcess.run();
// }
{ // Wait for results
dm.getCancelCriterion().checkCancelInProgress(null);
processor.waitForPrecommitCompletion();
// [DISTTX} TODO Handle stats
// dm.getStats().incCommitWaits();
Map<DistributedMember, Boolean> remoteResults = processor
.getRollbackResponseMap();
for (Entry<DistributedMember, Boolean> e : remoteResults.entrySet()) {
DistributedMember target = e.getKey();
Boolean remoteResult = e.getValue();
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger
.debug("DistTXStateProxyImplOnCoordinator.rollback target = "
+ target + " ,result= " + remoteResult
+ " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResult;
}
}
} finally {
inProgress = false;
if (this.synchRunnable != null) {
this.synchRunnable.abort();
}
}
/*
* [DISTTX] TODO Write similar method to take out exception
*
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
// checkDistributionReliability(distMap, processor);
// }
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.rollback finalResult= "
+ finalResult);
}
}
/**
* {@inheritDoc}
*/
@Override
public TXStateInterface getRealDeal(KeyInfo key, LocalRegion r) {
if (r != null) {
target = null;
// wait for the region to be initialized fixes bug 44652
r.waitOnInitialization(r.initializationLatchBeforeGetInitialImage);
if (r instanceof PartitionedRegion) {
target = getOwnerForKey(r, key);
} else if (r instanceof BucketRegion) {
target = ((BucketRegion) r).getBucketAdvisor().getPrimary();
// target = r.getMyId();
} else { //replicated region
target = getRRTarget(key, r);
}
this.realDeal = target2realDeals.get(target);
}
if (this.realDeal == null) {
// assert (r != null);
if (r == null) { // TODO: stop gap to get tests working
this.realDeal = new DistTXStateOnCoordinator(this, false);
target = this.txMgr.getDM().getId();
} else {
// Code to keep going forward
if (r.hasServerProxy()) {
// TODO [DISTTX] See what we need for client?
this.realDeal = new DistClientTXStateStub(this, target, r);
if (r.scope.isDistributed()) {
if (txDistributedClientWarningIssued.compareAndSet(false, true)) {
logger
.warn(LocalizedMessage
.create(
LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX,
r.getFullPath()));
}
}
} else {
// (r != null) code block above
if (target == null || target.equals(this.txMgr.getDM().getId())) {
this.realDeal = new DistTXStateOnCoordinator(this, false);
} else {
this.realDeal = new DistPeerTXStateStub(this, target,
onBehalfOfClientMember);
}
}
}
if (logger.isDebugEnabled()) {
logger
.debug(
"DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
this.realDeal, this.txMgr.getDM().getId(), this, target,
new Throwable());
}
target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal);
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
+ target2realDeals);
}
} else {
if (logger.isDebugEnabled()) {
logger
.debug(
"DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
this.realDeal, this, target, target2realDeals);
}
}
return this.realDeal;
}
@Override
public TXStateInterface getRealDeal(DistributedMember t) {
assert t != null;
this.realDeal = target2realDeals.get(target);
if (this.realDeal == null) {
this.target = t;
this.realDeal = new DistPeerTXStateStub(this, target,
onBehalfOfClientMember);
if (logger.isDebugEnabled()) {
logger
.debug(
"DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
this.realDeal, this.txMgr.getDM().getId());
}
if (!this.realDeal.isDistTx()
|| this.realDeal.isCreatedOnDistTxCoordinator()
|| !this.realDeal.isTxState()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED
.toLocalizedString("DistPeerTXStateStub", this.realDeal
.getClass().getSimpleName()));
}
target2realDeals.put(target, (DistPeerTXStateStub) realDeal);
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
+ target2realDeals);
}
} else {
if (logger.isDebugEnabled()) {
logger
.debug(
"DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
this.realDeal, this, target, target2realDeals);
}
}
return this.realDeal;
}
/*
* [DISTTX] TODO Do some optimization
*/
private DistributedMember getRRTarget(KeyInfo key, LocalRegion r) {
if (this.rrTargets == null) {
this.rrTargets = new HashMap();
}
DistributedMember m = this.rrTargets.get(r);
if (m == null) {
m = getOwnerForKey(r, key);
this.rrTargets.put(r, m);
}
return m;
}
private Set<DistributedMember> getTxRemoteParticpants(final DM dm) {
if (this.txRemoteParticpants == null) {
Set<DistributedMember> txParticpants = target2realDeals.keySet();
this.txRemoteParticpants = new HashSet<DistributedMember>(txParticpants);
// Remove local member from remote participant list
this.txRemoteParticpants.remove(dm.getId());
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.doPrecommit txParticpants = "
+ txParticpants
+ " ,txRemoteParticpants="
+ this.txRemoteParticpants + " ,originator=" + dm.getId());
}
}
return txRemoteParticpants;
}
private boolean doPrecommit() {
boolean finalResult = true;
final GemFireCacheImpl cache = GemFireCacheImpl
.getExisting("Applying Dist TX Precommit");
final DM dm = cache.getDistributionManager();
// Create Tx Participants
Set<DistributedMember> txParticpants = target2realDeals.keySet();
Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
// Determine if the set of VMs for any of the Regions for this TX have
// changed
DistributedRegion dr = null;
HashSet<LocalRegion> affectedRegions = new HashSet<LocalRegion>();
for (DistTXCoordinatorInterface distTXStateStub : target2realDeals.values()) {
affectedRegions.clear();
distTXStateStub.gatherAffectedRegions(affectedRegions, true, false);
for (LocalRegion lr : affectedRegions) {
if (lr.getScope().isLocal()) {
continue;
}
// [DISTTX] TODO what about PR?
if (lr instanceof DistributedRegion) {
dr = (DistributedRegion) lr;
CacheDistributionAdvisor adv = dr.getCacheDistributionAdvisor();
Set newRegionMemberView = adv.adviseTX();
if (!txParticpants.containsAll(newRegionMemberView)) {
logger
.warn(LocalizedMessage
.create(
LocalizedStrings.TXCommitMessage_NEW_MEMBERS_FOR_REGION_0_ORIG_LIST_1_NEW_LIST_2,
new Object[] { dr, txParticpants, newRegionMemberView }));
}
}
}
}
// create processor and precommit message
DistTXPrecommitMessage.DistTxPrecommitReplyProcessor processor = new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(
this.getTxId(), dm, txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXPrecommitMessage precommitMsg = new DistTXPrecommitMessage(
this.getTxId(), this.onBehalfOfClientMember, processor);
// send precommit message to remote nodes
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals
.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
"DistPeerTXStateStub", remoteTXStateStub.getClass()
.getSimpleName()));
}
try {
remoteTXStateStub.setPrecommitMessage(precommitMsg, dm);
remoteTXStateStub.precommit();
} finally {
remoteTXStateStub.setPrecommitMessage(null, null);
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = "
+ remoteNode);
}
}
// Do precommit on local node
TreeSet<String> sortedRegionName = new TreeSet<>();
DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId());
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
"DistTXStateOnCoordinator", localTXState.getClass()
.getSimpleName()));
}
localTXState.precommit();
boolean localResult = localTXState.getPreCommitResponse();
TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap = new TreeMap<String, ArrayList<DistTxThinEntryState>>();
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null;
if (localResult) {
localResult = ((DistTXStateOnCoordinator) localTXState)
.populateDistTxEntryStateList(entryStateSortedMap);
if (localResult) {
entryEventList = new ArrayList<ArrayList<DistTxThinEntryState>>(
entryStateSortedMap.values());
populateEntryEventMap(dm.getId(), entryEventList, sortedRegionName);
}
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit local = "
+ dm.getId() + " ,entryEventList="
+ printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
+ printEntryEventMap(this.txEntryEventMap) + " ,result= "
+ localResult + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && localResult;
}
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualSend != null) {
// internalAfterIndividualSend.run();
// }
/*
* [DISTTX] TODO see how to handle exception
*/
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualCommitProcess != null) {
// // Testing callback
// internalAfterIndividualCommitProcess.run();
// }
{ // Wait for results
dm.getCancelCriterion().checkCancelInProgress(null);
processor.waitForPrecommitCompletion();
// [DISTTX} TODO Handle stats
// dm.getStats().incCommitWaits();
Map<DistributedMember, DistTxPrecommitResponse> remoteResults = processor
.getCommitResponseMap();
for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults
.entrySet()) {
DistributedMember target = e.getKey();
DistTxPrecommitResponse remoteResponse = e.getValue();
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = remoteResponse.getDistTxEntryEventList();
populateEntryEventMap(target, entryEventList, sortedRegionName);
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.doPrecommit got reply from target = "
+ target
+ " ,sortedRegions"
+ sortedRegionName
+ " ,entryEventList="
+ printEntryEventList(entryEventList)
+ " ,txEntryEventMap="
+ printEntryEventMap(this.txEntryEventMap)
+ " ,result= "
+ remoteResponse.getCommitState()
+ " ,finalResult-old= "
+ finalResult);
}
finalResult = finalResult && remoteResponse.getCommitState();
}
}
/*
* [DISTTX] TODO Write similar method to take out exception
*
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
// checkDistributionReliability(distMap, processor);
// }
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.doPrecommit finalResult= "
+ finalResult);
}
return finalResult;
}
/*
* Handle response of precommit reply
*
* Go over list of region versions for this target and fill map
*/
private void populateEntryEventMap(DistributedMember target,
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList,
TreeSet<String> sortedRegionName) {
if (this.txEntryEventMap == null) {
this.txEntryEventMap = new HashMap<String, ArrayList<DistTxThinEntryState>>();
}
DistTXCoordinatorInterface distTxIface = target2realDeals.get(target);
if (distTxIface.getPrimaryTransactionalOperations() != null
&& distTxIface.getPrimaryTransactionalOperations().size() > 0) {
sortedRegionName.clear();
distTxIface.gatherAffectedRegionsName(sortedRegionName, true, false);
if (sortedRegionName.size() != entryEventList.size()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("size of "
+ sortedRegionName.size() + " {" + sortedRegionName + "}"
+ " for target=" + target, entryEventList.size() + " {"
+ entryEventList + "}"));
}
int index = 0;
// Get region as per sorted order of region path
for (String rName : sortedRegionName) {
txEntryEventMap.put(rName, entryEventList.get(index++));
}
}
}
/*
* Populate list of regions for this target, while sending commit messages
*/
private void populateEntryEventList(DistributedMember target,
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList,
TreeSet<String> sortedRegionMap) {
DistTXCoordinatorInterface distTxItem = target2realDeals.get(target);
sortedRegionMap.clear();
distTxItem.gatherAffectedRegionsName(sortedRegionMap, false, true);
// Get region as per sorted order of region path
entryEventList.clear();
for (String rName : sortedRegionMap) {
ArrayList<DistTxThinEntryState> entryStates = this.txEntryEventMap
.get(rName);
if (entryStates == null) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
"entryStates for " + rName + " at target " + target, "null"));
}
entryEventList.add(entryStates);
}
}
/*
* [DISTTX] TODO - Handle result TXMessage
*/
private boolean doCommit() {
boolean finalResult = true;
final GemFireCacheImpl cache = GemFireCacheImpl
.getExisting("Applying Dist TX Commit");
final DM dm = cache.getDistributionManager();
// Create Tx Participants
Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
// create processor and commit message
DistTXCommitMessage.DistTxCommitReplyProcessor processor = new DistTXCommitMessage.DistTxCommitReplyProcessor(
this.getTxId(), dm, txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXCommitMessage commitMsg = new DistTXCommitMessage(
this.getTxId(), this.onBehalfOfClientMember, processor);
// send commit message to remote nodes
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = new ArrayList<>();
TreeSet<String> sortedRegionName = new TreeSet<>();
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals
.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
"DistPeerTXStateStub", remoteTXStateStub.getClass()
.getSimpleName()));
}
try {
populateEntryEventList(remoteNode, entryEventList,
sortedRegionName);
commitMsg.setEntryStateList(entryEventList);
remoteTXStateStub.setCommitMessage(commitMsg, dm);
remoteTXStateStub.commit();
} finally {
remoteTXStateStub.setCommitMessage(null, null);
remoteTXStateStub.finalCleanup();
}
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.doCommit Sent Message target = "
+ remoteNode
+ " ,sortedRegions="
+ sortedRegionName
+ " ,entryEventList="
+ printEntryEventList(entryEventList)
+ " ,txEntryEventMap="
+ printEntryEventMap(this.txEntryEventMap));
}
}
// Do commit on local node
DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId());
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
"DistTXStateOnCoordinator", localTXState.getClass()
.getSimpleName()));
}
populateEntryEventList(dm.getId(), entryEventList, sortedRegionName);
((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList);
localTXState.commit();
TXCommitMessage localResultMsg = localTXState.getCommitMessage();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doCommit local = "
+ dm.getId() + " ,sortedRegions=" + sortedRegionName
+ " ,entryEventList=" + printEntryEventList(entryEventList)
+ " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap)
+ " ,result= " + localResultMsg != null + " ,finalResult-old= "
+ finalResult);
}
finalResult = finalResult && (localResultMsg != null);
}
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualSend != null) {
// internalAfterIndividualSend.run();
// }
/*
* [DISTTX] TODO see how to handle exception
*/
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualCommitProcess != null) {
// // Testing callback
// internalAfterIndividualCommitProcess.run();
// }
{ // Wait for results
dm.getCancelCriterion().checkCancelInProgress(null);
processor.waitForPrecommitCompletion();
// [DISTTX} TODO Handle stats
dm.getStats().incCommitWaits();
Map<DistributedMember, TXCommitMessage> remoteResults = processor
.getCommitResponseMap();
for (Entry<DistributedMember, TXCommitMessage> e : remoteResults
.entrySet()) {
DistributedMember target = e.getKey();
TXCommitMessage remoteResultMsg = e.getValue();
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger
.debug("DistTXStateProxyImplOnCoordinator.doCommit got results from target = "
+ target + " ,result= " + remoteResultMsg != null
+ " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResultMsg != null;
}
}
/*
* [DISTTX] TODO Write similar method to take out exception
*
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
// checkDistributionReliability(distMap, processor);
// }
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doCommit finalResult= "
+ finalResult);
}
return finalResult;
}
/**
* For distributed transactions, this divides the user's putAll operation into
* multiple per bucket putAll ops(with entries to be put in that bucket) and
* then fires those using using appropriate TXStateStub (for target that host
* the corresponding bucket)
*/
@Override
public void postPutAll(DistributedPutAllOperation putallOp,
VersionedObjectList successfulPuts, LocalRegion region) {
if (putallOp.putAllData.length == 0) {
return;
}
if (region instanceof DistributedRegion) {
super.postPutAll(putallOp, successfulPuts, region);
} else {
region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
// #43651
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.postPutAll "
+ "processing putAll op for region {}, size of putAllOp "
+ "is {}", region, putallOp.putAllData.length);
}
//map of bucketId to putall op for this bucket
HashMap<Integer, DistributedPutAllOperation> bucketToPutallMap =
new HashMap<Integer, DistributedPutAllOperation>();
//map of bucketId to TXStateStub for target that hosts this bucket
HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
new HashMap<Integer, DistTXCoordinatorInterface>();
//separate the putall op per bucket
for (int i=0; i<putallOp.putAllData.length; i++) {
assert (putallOp.putAllData[i] != null);
Object key = putallOp.putAllData[i].key;
int bucketId = putallOp.putAllData[i].getBucketId();
DistributedPutAllOperation putAllForBucket =
bucketToPutallMap.get(bucketId);;
if (putAllForBucket == null) {
EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
Operation.PUTALL_CREATE, key,
putallOp.putAllData[i].getValue());
event.setEventId(putallOp.putAllData[i].getEventID());
putAllForBucket = new DistributedPutAllOperation(
event, putallOp.putAllDataSize, putallOp.isBridgeOp);
bucketToPutallMap.put(bucketId, putAllForBucket);
}
putAllForBucket.addEntry(putallOp.putAllData[i]);
KeyInfo ki = new KeyInfo(key, null, null);
DistTXCoordinatorInterface tsi = (DistTXCoordinatorInterface) getRealDeal(ki, region);
bucketToTxStateStubMap.put(bucketId, tsi);
}
// fire a putAll operation for each bucket using appropriate TXStateStub
// (for target that host this bucket)
// [DISTTX] [TODO] Perf: Can this be further optimized?
// This sends putAll in a loop to each target bucket (and waits for ack)
// one after another.Could we send respective putAll messages to all
// targets using same reply processor and wait on it?
for (Entry<Integer, DistTXCoordinatorInterface> e : bucketToTxStateStubMap
.entrySet()) {
Integer bucketId = e.getKey();
DistTXCoordinatorInterface dtsi = e.getValue();
DistributedPutAllOperation putAllForBucket = bucketToPutallMap
.get(bucketId);
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.postPutAll processing"
+ " putAll for ##bucketId = {}, ##txStateStub = {}, "
+ "##putAllOp = {}"
, bucketId, dtsi, putAllForBucket);
}
dtsi.postPutAll(putAllForBucket, successfulPuts, region);
}
}
}
/**
* For distributed transactions, this divides the user's removeAll operation
* into multiple per bucket removeAll ops(with entries to be removed from that
* bucket) and then fires those using using appropriate TXStateStub (for
* target that host the corresponding bucket)
*/
@Override
public void postRemoveAll(DistributedRemoveAllOperation op,
VersionedObjectList successfulOps, LocalRegion region) {
if (op.removeAllData.length == 0) {
return;
}
if (region instanceof DistributedRegion) {
super.postRemoveAll(op, successfulOps, region);
} else {
region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
// #43651
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.postRemoveAll "
+ "processing removeAll op for region {}, size of removeAll "
+ "is {}", region, op.removeAllDataSize);
}
//map of bucketId to removeAll op for this bucket
HashMap<Integer, DistributedRemoveAllOperation> bucketToRemoveAllMap =
new HashMap<Integer, DistributedRemoveAllOperation>();
//map of bucketId to TXStateStub for target that hosts this bucket
HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
new HashMap<Integer, DistTXCoordinatorInterface>();
//separate the removeAll op per bucket
for (int i=0; i<op.removeAllData.length; i++) {
assert (op.removeAllData[i] != null);
Object key = op.removeAllData[i].key;
int bucketId = op.removeAllData[i].getBucketId();
DistributedRemoveAllOperation removeAllForBucket =
bucketToRemoveAllMap.get(bucketId);
if (removeAllForBucket == null) {
EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
Operation.REMOVEALL_DESTROY, key, null);
removeAllForBucket = new DistributedRemoveAllOperation(
event, op.removeAllDataSize, op.isBridgeOp);
bucketToRemoveAllMap.put(bucketId, removeAllForBucket);
}
removeAllForBucket.addEntry(op.removeAllData[i]);
KeyInfo ki = new KeyInfo(key, null, null);
DistTXCoordinatorInterface tsi = (DistTXCoordinatorInterface) getRealDeal(ki, region);
bucketToTxStateStubMap.put(bucketId, tsi);
}
// fire a removeAll operation for each bucket using appropriate TXStateStub
// (for target that host this bucket)
// [DISTTX] [TODO] Perf: Can this be further optimized?
// This sends putAll in a loop to each target bucket (and waits for ack)
// one after another.Could we send respective putAll messages to all
// targets using same reply processor and wait on it?
for (Entry<Integer, DistTXCoordinatorInterface> e : bucketToTxStateStubMap
.entrySet()) {
Integer bucketId = e.getKey();
DistTXCoordinatorInterface dtsi = e.getValue();
DistributedRemoveAllOperation removeAllForBucket = bucketToRemoveAllMap
.get(bucketId);
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
+ " removeAll for ##bucketId = {}, ##txStateStub = {}, "
+ "##removeAllOp = {}"
, bucketId, dtsi, removeAllForBucket);
}
dtsi.postRemoveAll(removeAllForBucket, successfulOps, region);
}
}
}
@Override
public boolean isCreatedOnDistTxCoordinator() {
return true;
}
public static String printEntryEventMap(
HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
StringBuilder str = new StringBuilder();
str.append(" (");
str.append(txRegionVersionsMap.size());
str.append(")=[ ");
for (Map.Entry<String, ArrayList<DistTxThinEntryState>> entry : txRegionVersionsMap
.entrySet()) {
str.append(" {").append(entry.getKey());
str.append(":").append("size(").append(entry.getValue().size())
.append(")");
str.append("=").append(entry.getValue()).append("}, ");
}
str.append(" } ");
return str.toString();
}
public static String printEntryEventList(
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
StringBuilder str = new StringBuilder();
str.append(" (");
str.append(entryEventList.size());
str.append(")=[ ");
for (ArrayList<DistTxThinEntryState> entry : entryEventList) {
str.append(" ( ");
str.append(entry.size());
str.append(" )={").append(entry);
str.append(" } ");
}
str.append(" ] ");
return str.toString();
}
/*
* Do not return null
*/
public DistributedMember getOwnerForKey(LocalRegion r, KeyInfo key) {
DistributedMember m = r.getOwnerForKey(key);
if (m == null) {
GemFireCacheImpl cache = GemFireCacheImpl.getExisting("getOwnerForKey");
m = cache.getDistributedSystem().getDistributedMember();
}
return m;
}
}