| package com.gemstone.gemfire.internal.cache; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeMap; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| import com.gemstone.gemfire.cache.CommitConflictException; |
| import com.gemstone.gemfire.cache.Operation; |
| import com.gemstone.gemfire.cache.TransactionInDoubtException; |
| import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.internal.DM; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.cache.DistTXPrecommitMessage.DistTxPrecommitResponse; |
| import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllEntryData; |
| import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation.RemoveAllEntryData; |
| import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList; |
| import com.gemstone.gemfire.internal.cache.tx.DistClientTXStateStub; |
| import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| |
| public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { |
| |
| /** |
| * A map of distributed system member to either {@link DistPeerTXStateStub} or |
| * {@link DistTXStateOnCoordinator} (in case of TX coordinator is also a data |
| * node) |
| */ |
| protected HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals = new HashMap<>(); |
| private HashMap<LocalRegion, DistributedMember> rrTargets; |
| private Set<DistributedMember> txRemoteParticpants = null; // other than local |
| protected HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null; |
| |
| public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id, |
| InternalDistributedMember clientMember) { |
| super(managerImpl, id, clientMember); |
| } |
| |
| public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id, |
| boolean isjta) { |
| super(managerImpl, id, isjta); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit() |
| * |
| * [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure |
| * these messages reach all |
| */ |
| @Override |
| public void commit() throws CommitConflictException { |
| boolean preserveTx = false; |
| boolean precommitResult = false; |
| try { |
| // create a map of secondary(for PR) / replica(for RR) to stubs to send |
| // commit message to those |
| HashMap<DistributedMember, DistTXCoordinatorInterface> otherTargets2realDeals = getSecondariesAndReplicasForTxOps(); |
| // add it to the existing map and then send commit to all copies |
| target2realDeals.putAll(otherTargets2realDeals); |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.commit target2realDeals = " |
| + target2realDeals); |
| } |
| |
| precommitResult = doPrecommit(); |
| if (precommitResult) { |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.commit Going for commit "); |
| } |
| boolean phase2commitDone = doCommit(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.commit Commit " |
| + (phase2commitDone ? "Done" : "Failed")); |
| } |
| // [DISTTX] TODO Handle this exception well |
| if (!phase2commitDone) { |
| throw new TransactionInDoubtException( |
| LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER |
| .toLocalizedString()); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.commit precommitResult = " |
| + precommitResult); |
| } |
| } |
| } catch (CommitConflictException e) { |
| throw e; |
| } catch (UnsupportedOperationInTransactionException e) { |
| // fix for #42490 |
| preserveTx = true; |
| throw e; |
| } finally { |
| // [DISTTX] TODO What about rollback exceptions? |
| if (!precommitResult) { |
| rollback(); |
| } |
| |
| inProgress = preserveTx; |
| if (this.synchRunnable != null) { |
| this.synchRunnable.abort(); |
| } |
| } |
| } |
| |
| /** |
| * creates a map of all secondaries(for PR) / replicas(for RR) to stubs to |
| * send commit message to those |
| */ |
| private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() { |
| final GemFireCacheImpl cache = GemFireCacheImpl |
| .getExisting("getSecondariesAndReplicasForTxOps"); |
| InternalDistributedMember currentNode = cache.getDistributedSystem() |
| .getDistributedMember(); |
| |
| HashMap<DistributedMember, DistTXCoordinatorInterface> secondaryTarget2realDeals = new HashMap<>(); |
| for (Entry<DistributedMember, DistTXCoordinatorInterface> e : target2realDeals |
| .entrySet()) { |
| DistributedMember originalTarget = e.getKey(); |
| DistTXCoordinatorInterface distPeerTxStateStub = e.getValue(); |
| |
| ArrayList<DistTxEntryEvent> primaryTxOps = distPeerTxStateStub |
| .getPrimaryTransactionalOperations(); |
| for (DistTxEntryEvent dtop : primaryTxOps) { |
| LocalRegion lr = dtop.getRegion(); |
| // replicas or secondaries |
| Set<InternalDistributedMember> otherNodes = null; |
| if (lr instanceof PartitionedRegion) { |
| Set<InternalDistributedMember> allNodes = ((PartitionedRegion) dtop |
| .getRegion()).getRegionAdvisor().getBucketOwners( |
| dtop.getKeyInfo().getBucketId()); |
| allNodes.remove(originalTarget); |
| otherNodes = allNodes; |
| } else if (lr instanceof DistributedRegion) { |
| otherNodes = ((DistributedRegion) lr).getCacheDistributionAdvisor() |
| .adviseInitializedReplicates(); |
| otherNodes.remove(originalTarget); |
| } |
| |
| if (otherNodes != null) { |
| for (InternalDistributedMember dm : otherNodes) { |
| // whether the target already exists due to other Tx op on the node |
| DistTXCoordinatorInterface existingDistPeerTXStateStub = target2realDeals |
| .get(dm); |
| if (existingDistPeerTXStateStub == null) { |
| existingDistPeerTXStateStub = secondaryTarget2realDeals.get(dm); |
| if (existingDistPeerTXStateStub == null) { |
| DistTXCoordinatorInterface newTxStub = null; |
| if (currentNode.equals(dm)) { |
| // [DISTTX] TODO add a test case for this condition? |
| newTxStub = new DistTXStateOnCoordinator(this, false); |
| } else { |
| newTxStub = new DistPeerTXStateStub(this, dm, |
| onBehalfOfClientMember); |
| } |
| newTxStub.addSecondaryTransactionalOperations(dtop); |
| secondaryTarget2realDeals.put(dm, newTxStub); |
| } else { |
| existingDistPeerTXStateStub |
| .addSecondaryTransactionalOperations(dtop); |
| } |
| } else { |
| existingDistPeerTXStateStub |
| .addSecondaryTransactionalOperations(dtop); |
| } |
| } |
| } |
| } |
| } |
| return secondaryTarget2realDeals; |
| } |
| |
| @Override |
| public void rollback() { |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.rollback Going for rollback "); |
| } |
| |
| boolean finalResult = false; |
| final GemFireCacheImpl cache = GemFireCacheImpl |
| .getExisting("Applying Dist TX Rollback"); |
| final DM dm = cache.getDistributionManager(); |
| try { |
| // Create Tx Participants |
| Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm); |
| |
| // create processor and rollback message |
| DistTXRollbackMessage.DistTxRollbackReplyProcessor processor = new DistTXRollbackMessage.DistTxRollbackReplyProcessor( |
| this.getTxId(), dm, txRemoteParticpants, target2realDeals); |
| // TODO [DISTTX} whats ack threshold? |
| processor.enableSevereAlertProcessing(); |
| final DistTXRollbackMessage rollbackMsg = new DistTXRollbackMessage( |
| this.getTxId(), this.onBehalfOfClientMember, processor); |
| |
| // send rollback message to remote nodes |
| for (DistributedMember remoteNode : txRemoteParticpants) { |
| DistTXCoordinatorInterface remoteTXStateStub = target2realDeals |
| .get(remoteNode); |
| if (remoteTXStateStub.isTxState()) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( |
| "DistPeerTXStateStub", remoteTXStateStub.getClass() |
| .getSimpleName())); |
| } |
| try { |
| remoteTXStateStub.setRollbackMessage(rollbackMsg, dm); |
| remoteTXStateStub.rollback(); |
| } finally { |
| remoteTXStateStub.setRollbackMessage(null, null); |
| remoteTXStateStub.finalCleanup(); |
| } |
| if (logger.isDebugEnabled()) { // TODO - make this trace level |
| logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = " |
| + remoteNode); |
| } |
| } |
| |
| // Do rollback on local node |
| DistTXCoordinatorInterface localTXState = target2realDeals |
| .get(dm.getId()); |
| if (localTXState != null) { |
| if (!localTXState.isTxState()) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( |
| "DistTXStateOnCoordinator", localTXState.getClass() |
| .getSimpleName())); |
| } |
| localTXState.rollback(); |
| boolean localResult = localTXState.getRollbackResponse(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.rollback local = " |
| + dm.getId() + " ,result= " + localResult + " ,finalResult-old= " |
| + finalResult); |
| } |
| finalResult = finalResult && localResult; |
| } |
| |
| /* |
| * [DISTTX] TODO Any test hooks |
| */ |
| // if (internalAfterIndividualSend != null) { |
| // internalAfterIndividualSend.run(); |
| // } |
| |
| /* |
| * [DISTTX] TODO see how to handle exception |
| */ |
| |
| /* |
| * [DISTTX] TODO Any test hooks |
| */ |
| // if (internalAfterIndividualCommitProcess != null) { |
| // // Testing callback |
| // internalAfterIndividualCommitProcess.run(); |
| // } |
| |
| { // Wait for results |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| processor.waitForPrecommitCompletion(); |
| |
| // [DISTTX} TODO Handle stats |
| // dm.getStats().incCommitWaits(); |
| |
| Map<DistributedMember, Boolean> remoteResults = processor |
| .getRollbackResponseMap(); |
| for (Entry<DistributedMember, Boolean> e : remoteResults.entrySet()) { |
| DistributedMember target = e.getKey(); |
| Boolean remoteResult = e.getValue(); |
| if (logger.isDebugEnabled()) { // TODO - make this trace level |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.rollback target = " |
| + target + " ,result= " + remoteResult |
| + " ,finalResult-old= " + finalResult); |
| } |
| finalResult = finalResult && remoteResult; |
| } |
| } |
| |
| } finally { |
| inProgress = false; |
| if (this.synchRunnable != null) { |
| this.synchRunnable.abort(); |
| } |
| } |
| |
| /* |
| * [DISTTX] TODO Write similar method to take out exception |
| * |
| * [DISTTX] TODO Handle Reliable regions |
| */ |
| // if (this.hasReliableRegions) { |
| // checkDistributionReliability(distMap, processor); |
| // } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.rollback finalResult= " |
| + finalResult); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public TXStateInterface getRealDeal(KeyInfo key, LocalRegion r) { |
| if (r != null) { |
| target = null; |
| // wait for the region to be initialized fixes bug 44652 |
| r.waitOnInitialization(r.initializationLatchBeforeGetInitialImage); |
| if (r instanceof PartitionedRegion) { |
| target = getOwnerForKey(r, key); |
| } else if (r instanceof BucketRegion) { |
| target = ((BucketRegion) r).getBucketAdvisor().getPrimary(); |
| // target = r.getMyId(); |
| } else { //replicated region |
| target = getRRTarget(key, r); |
| } |
| this.realDeal = target2realDeals.get(target); |
| } |
| if (this.realDeal == null) { |
| // assert (r != null); |
| if (r == null) { // TODO: stop gap to get tests working |
| this.realDeal = new DistTXStateOnCoordinator(this, false); |
| target = this.txMgr.getDM().getId(); |
| } else { |
| // Code to keep going forward |
| if (r.hasServerProxy()) { |
| // TODO [DISTTX] See what we need for client? |
| this.realDeal = new DistClientTXStateStub(this, target, r); |
| if (r.scope.isDistributed()) { |
| if (txDistributedClientWarningIssued.compareAndSet(false, true)) { |
| logger |
| .warn(LocalizedMessage |
| .create( |
| LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX, |
| r.getFullPath())); |
| } |
| } |
| } else { |
| // (r != null) code block above |
| if (target == null || target.equals(this.txMgr.getDM().getId())) { |
| this.realDeal = new DistTXStateOnCoordinator(this, false); |
| } else { |
| this.realDeal = new DistPeerTXStateStub(this, target, |
| onBehalfOfClientMember); |
| } |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug( |
| "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}", |
| this.realDeal, this.txMgr.getDM().getId(), this, target, |
| new Throwable()); |
| } |
| target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal); |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = " |
| + target2realDeals); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug( |
| "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}", |
| this.realDeal, this, target, target2realDeals); |
| } |
| } |
| return this.realDeal; |
| } |
| |
| @Override |
| public TXStateInterface getRealDeal(DistributedMember t) { |
| assert t != null; |
| this.realDeal = target2realDeals.get(target); |
| if (this.realDeal == null) { |
| this.target = t; |
| this.realDeal = new DistPeerTXStateStub(this, target, |
| onBehalfOfClientMember); |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug( |
| "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}", |
| this.realDeal, this.txMgr.getDM().getId()); |
| } |
| if (!this.realDeal.isDistTx() |
| || this.realDeal.isCreatedOnDistTxCoordinator() |
| || !this.realDeal.isTxState()) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED |
| .toLocalizedString("DistPeerTXStateStub", this.realDeal |
| .getClass().getSimpleName())); |
| } |
| target2realDeals.put(target, (DistPeerTXStateStub) realDeal); |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = " |
| + target2realDeals); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug( |
| "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}", |
| this.realDeal, this, target, target2realDeals); |
| } |
| } |
| return this.realDeal; |
| } |
| |
| /* |
| * [DISTTX] TODO Do some optimization |
| */ |
| private DistributedMember getRRTarget(KeyInfo key, LocalRegion r) { |
| if (this.rrTargets == null) { |
| this.rrTargets = new HashMap(); |
| } |
| DistributedMember m = this.rrTargets.get(r); |
| if (m == null) { |
| m = getOwnerForKey(r, key); |
| this.rrTargets.put(r, m); |
| } |
| return m; |
| } |
| |
| private Set<DistributedMember> getTxRemoteParticpants(final DM dm) { |
| if (this.txRemoteParticpants == null) { |
| Set<DistributedMember> txParticpants = target2realDeals.keySet(); |
| this.txRemoteParticpants = new HashSet<DistributedMember>(txParticpants); |
| // Remove local member from remote participant list |
| this.txRemoteParticpants.remove(dm.getId()); |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.doPrecommit txParticpants = " |
| + txParticpants |
| + " ,txRemoteParticpants=" |
| + this.txRemoteParticpants + " ,originator=" + dm.getId()); |
| } |
| } |
| return txRemoteParticpants; |
| } |
| |
| private boolean doPrecommit() { |
| boolean finalResult = true; |
| final GemFireCacheImpl cache = GemFireCacheImpl |
| .getExisting("Applying Dist TX Precommit"); |
| final DM dm = cache.getDistributionManager(); |
| |
| // Create Tx Participants |
| Set<DistributedMember> txParticpants = target2realDeals.keySet(); |
| Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm); |
| |
| // Determine if the set of VMs for any of the Regions for this TX have |
| // changed |
| DistributedRegion dr = null; |
| HashSet<LocalRegion> affectedRegions = new HashSet<LocalRegion>(); |
| for (DistTXCoordinatorInterface distTXStateStub : target2realDeals.values()) { |
| affectedRegions.clear(); |
| distTXStateStub.gatherAffectedRegions(affectedRegions, true, false); |
| for (LocalRegion lr : affectedRegions) { |
| if (lr.getScope().isLocal()) { |
| continue; |
| } |
| // [DISTTX] TODO what about PR? |
| if (lr instanceof DistributedRegion) { |
| dr = (DistributedRegion) lr; |
| CacheDistributionAdvisor adv = dr.getCacheDistributionAdvisor(); |
| Set newRegionMemberView = adv.adviseTX(); |
| |
| if (!txParticpants.containsAll(newRegionMemberView)) { |
| logger |
| .warn(LocalizedMessage |
| .create( |
| LocalizedStrings.TXCommitMessage_NEW_MEMBERS_FOR_REGION_0_ORIG_LIST_1_NEW_LIST_2, |
| new Object[] { dr, txParticpants, newRegionMemberView })); |
| } |
| } |
| } |
| } |
| |
| // create processor and precommit message |
| DistTXPrecommitMessage.DistTxPrecommitReplyProcessor processor = new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor( |
| this.getTxId(), dm, txRemoteParticpants, target2realDeals); |
| // TODO [DISTTX} whats ack threshold? |
| processor.enableSevereAlertProcessing(); |
| final DistTXPrecommitMessage precommitMsg = new DistTXPrecommitMessage( |
| this.getTxId(), this.onBehalfOfClientMember, processor); |
| |
| // send precommit message to remote nodes |
| for (DistributedMember remoteNode : txRemoteParticpants) { |
| DistTXCoordinatorInterface remoteTXStateStub = target2realDeals |
| .get(remoteNode); |
| if (remoteTXStateStub.isTxState()) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( |
| "DistPeerTXStateStub", remoteTXStateStub.getClass() |
| .getSimpleName())); |
| } |
| try { |
| remoteTXStateStub.setPrecommitMessage(precommitMsg, dm); |
| remoteTXStateStub.precommit(); |
| } finally { |
| remoteTXStateStub.setPrecommitMessage(null, null); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " |
| + remoteNode); |
| } |
| } |
| |
| // Do precommit on local node |
| TreeSet<String> sortedRegionName = new TreeSet<>(); |
| DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId()); |
| if (localTXState != null) { |
| if (!localTXState.isTxState()) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( |
| "DistTXStateOnCoordinator", localTXState.getClass() |
| .getSimpleName())); |
| } |
| localTXState.precommit(); |
| boolean localResult = localTXState.getPreCommitResponse(); |
| TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap = new TreeMap<String, ArrayList<DistTxThinEntryState>>(); |
| ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null; |
| if (localResult) { |
| localResult = ((DistTXStateOnCoordinator) localTXState) |
| .populateDistTxEntryStateList(entryStateSortedMap); |
| if (localResult) { |
| entryEventList = new ArrayList<ArrayList<DistTxThinEntryState>>( |
| entryStateSortedMap.values()); |
| populateEntryEventMap(dm.getId(), entryEventList, sortedRegionName); |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit local = " |
| + dm.getId() + " ,entryEventList=" |
| + printEntryEventList(entryEventList) + " ,txRegionVersionsMap=" |
| + printEntryEventMap(this.txEntryEventMap) + " ,result= " |
| + localResult + " ,finalResult-old= " + finalResult); |
| } |
| finalResult = finalResult && localResult; |
| } |
| |
| /* |
| * [DISTTX] TODO Any test hooks |
| */ |
| // if (internalAfterIndividualSend != null) { |
| // internalAfterIndividualSend.run(); |
| // } |
| |
| /* |
| * [DISTTX] TODO see how to handle exception |
| */ |
| |
| /* |
| * [DISTTX] TODO Any test hooks |
| */ |
| // if (internalAfterIndividualCommitProcess != null) { |
| // // Testing callback |
| // internalAfterIndividualCommitProcess.run(); |
| // } |
| |
| { // Wait for results |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| processor.waitForPrecommitCompletion(); |
| |
| // [DISTTX} TODO Handle stats |
| // dm.getStats().incCommitWaits(); |
| |
| Map<DistributedMember, DistTxPrecommitResponse> remoteResults = processor |
| .getCommitResponseMap(); |
| for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults |
| .entrySet()) { |
| DistributedMember target = e.getKey(); |
| DistTxPrecommitResponse remoteResponse = e.getValue(); |
| ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = remoteResponse.getDistTxEntryEventList(); |
| populateEntryEventMap(target, entryEventList, sortedRegionName); |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.doPrecommit got reply from target = " |
| + target |
| + " ,sortedRegions" |
| + sortedRegionName |
| + " ,entryEventList=" |
| + printEntryEventList(entryEventList) |
| + " ,txEntryEventMap=" |
| + printEntryEventMap(this.txEntryEventMap) |
| + " ,result= " |
| + remoteResponse.getCommitState() |
| + " ,finalResult-old= " |
| + finalResult); |
| } |
| finalResult = finalResult && remoteResponse.getCommitState(); |
| } |
| } |
| |
| /* |
| * [DISTTX] TODO Write similar method to take out exception |
| * |
| * [DISTTX] TODO Handle Reliable regions |
| */ |
| // if (this.hasReliableRegions) { |
| // checkDistributionReliability(distMap, processor); |
| // } |
| |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.doPrecommit finalResult= " |
| + finalResult); |
| } |
| return finalResult; |
| } |
| |
| /* |
| * Handle response of precommit reply |
| * |
| * Go over list of region versions for this target and fill map |
| */ |
| private void populateEntryEventMap(DistributedMember target, |
| ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, |
| TreeSet<String> sortedRegionName) { |
| if (this.txEntryEventMap == null) { |
| this.txEntryEventMap = new HashMap<String, ArrayList<DistTxThinEntryState>>(); |
| } |
| |
| DistTXCoordinatorInterface distTxIface = target2realDeals.get(target); |
| if (distTxIface.getPrimaryTransactionalOperations() != null |
| && distTxIface.getPrimaryTransactionalOperations().size() > 0) { |
| sortedRegionName.clear(); |
| distTxIface.gatherAffectedRegionsName(sortedRegionName, true, false); |
| |
| if (sortedRegionName.size() != entryEventList.size()) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("size of " |
| + sortedRegionName.size() + " {" + sortedRegionName + "}" |
| + " for target=" + target, entryEventList.size() + " {" |
| + entryEventList + "}")); |
| } |
| |
| int index = 0; |
| // Get region as per sorted order of region path |
| for (String rName : sortedRegionName) { |
| txEntryEventMap.put(rName, entryEventList.get(index++)); |
| } |
| } |
| } |
| |
| /* |
| * Populate list of regions for this target, while sending commit messages |
| */ |
| private void populateEntryEventList(DistributedMember target, |
| ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, |
| TreeSet<String> sortedRegionMap) { |
| DistTXCoordinatorInterface distTxItem = target2realDeals.get(target); |
| sortedRegionMap.clear(); |
| distTxItem.gatherAffectedRegionsName(sortedRegionMap, false, true); |
| |
| // Get region as per sorted order of region path |
| entryEventList.clear(); |
| for (String rName : sortedRegionMap) { |
| ArrayList<DistTxThinEntryState> entryStates = this.txEntryEventMap |
| .get(rName); |
| if (entryStates == null) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( |
| "entryStates for " + rName + " at target " + target, "null")); |
| } |
| entryEventList.add(entryStates); |
| } |
| } |
| |
| /* |
| * [DISTTX] TODO - Handle result TXMessage |
| */ |
| private boolean doCommit() { |
| boolean finalResult = true; |
| final GemFireCacheImpl cache = GemFireCacheImpl |
| .getExisting("Applying Dist TX Commit"); |
| final DM dm = cache.getDistributionManager(); |
| |
| // Create Tx Participants |
| Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm); |
| |
| // create processor and commit message |
| DistTXCommitMessage.DistTxCommitReplyProcessor processor = new DistTXCommitMessage.DistTxCommitReplyProcessor( |
| this.getTxId(), dm, txRemoteParticpants, target2realDeals); |
| // TODO [DISTTX} whats ack threshold? |
| processor.enableSevereAlertProcessing(); |
| final DistTXCommitMessage commitMsg = new DistTXCommitMessage( |
| this.getTxId(), this.onBehalfOfClientMember, processor); |
| |
| // send commit message to remote nodes |
| ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = new ArrayList<>(); |
| TreeSet<String> sortedRegionName = new TreeSet<>(); |
| for (DistributedMember remoteNode : txRemoteParticpants) { |
| DistTXCoordinatorInterface remoteTXStateStub = target2realDeals |
| .get(remoteNode); |
| if (remoteTXStateStub.isTxState()) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( |
| "DistPeerTXStateStub", remoteTXStateStub.getClass() |
| .getSimpleName())); |
| } |
| try { |
| populateEntryEventList(remoteNode, entryEventList, |
| sortedRegionName); |
| commitMsg.setEntryStateList(entryEventList); |
| remoteTXStateStub.setCommitMessage(commitMsg, dm); |
| remoteTXStateStub.commit(); |
| } finally { |
| remoteTXStateStub.setCommitMessage(null, null); |
| remoteTXStateStub.finalCleanup(); |
| } |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.doCommit Sent Message target = " |
| + remoteNode |
| + " ,sortedRegions=" |
| + sortedRegionName |
| + " ,entryEventList=" |
| + printEntryEventList(entryEventList) |
| + " ,txEntryEventMap=" |
| + printEntryEventMap(this.txEntryEventMap)); |
| } |
| } |
| |
| // Do commit on local node |
| DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId()); |
| if (localTXState != null) { |
| if (!localTXState.isTxState()) { |
| throw new UnsupportedOperationInTransactionException( |
| LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( |
| "DistTXStateOnCoordinator", localTXState.getClass() |
| .getSimpleName())); |
| } |
| populateEntryEventList(dm.getId(), entryEventList, sortedRegionName); |
| ((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList); |
| localTXState.commit(); |
| TXCommitMessage localResultMsg = localTXState.getCommitMessage(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.doCommit local = " |
| + dm.getId() + " ,sortedRegions=" + sortedRegionName |
| + " ,entryEventList=" + printEntryEventList(entryEventList) |
| + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) |
| + " ,result= " + localResultMsg != null + " ,finalResult-old= " |
| + finalResult); |
| } |
| finalResult = finalResult && (localResultMsg != null); |
| } |
| |
| /* |
| * [DISTTX] TODO Any test hooks |
| */ |
| // if (internalAfterIndividualSend != null) { |
| // internalAfterIndividualSend.run(); |
| // } |
| |
| /* |
| * [DISTTX] TODO see how to handle exception |
| */ |
| |
| /* |
| * [DISTTX] TODO Any test hooks |
| */ |
| // if (internalAfterIndividualCommitProcess != null) { |
| // // Testing callback |
| // internalAfterIndividualCommitProcess.run(); |
| // } |
| |
| { // Wait for results |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| processor.waitForPrecommitCompletion(); |
| |
| // [DISTTX} TODO Handle stats |
| dm.getStats().incCommitWaits(); |
| |
| Map<DistributedMember, TXCommitMessage> remoteResults = processor |
| .getCommitResponseMap(); |
| for (Entry<DistributedMember, TXCommitMessage> e : remoteResults |
| .entrySet()) { |
| DistributedMember target = e.getKey(); |
| TXCommitMessage remoteResultMsg = e.getValue(); |
| if (logger.isDebugEnabled()) { // TODO - make this trace level |
| logger |
| .debug("DistTXStateProxyImplOnCoordinator.doCommit got results from target = " |
| + target + " ,result= " + remoteResultMsg != null |
| + " ,finalResult-old= " + finalResult); |
| } |
| finalResult = finalResult && remoteResultMsg != null; |
| } |
| } |
| |
| /* |
| * [DISTTX] TODO Write similar method to take out exception |
| * |
| * [DISTTX] TODO Handle Reliable regions |
| */ |
| // if (this.hasReliableRegions) { |
| // checkDistributionReliability(distMap, processor); |
| // } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.doCommit finalResult= " |
| + finalResult); |
| } |
| return finalResult; |
| } |
| |
| /** |
| * For distributed transactions, this divides the user's putAll operation into |
| * multiple per bucket putAll ops(with entries to be put in that bucket) and |
| * then fires those using using appropriate TXStateStub (for target that host |
| * the corresponding bucket) |
| */ |
| @Override |
| public void postPutAll(DistributedPutAllOperation putallOp, |
| VersionedObjectList successfulPuts, LocalRegion region) { |
| if (putallOp.putAllData.length == 0) { |
| return; |
| } |
| if (region instanceof DistributedRegion) { |
| super.postPutAll(putallOp, successfulPuts, region); |
| } else { |
| region.getCancelCriterion().checkCancelInProgress(null); // fix for bug |
| // #43651 |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.postPutAll " |
| + "processing putAll op for region {}, size of putAllOp " |
| + "is {}", region, putallOp.putAllData.length); |
| } |
| |
| |
| //map of bucketId to putall op for this bucket |
| HashMap<Integer, DistributedPutAllOperation> bucketToPutallMap = |
| new HashMap<Integer, DistributedPutAllOperation>(); |
| //map of bucketId to TXStateStub for target that hosts this bucket |
| HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap = |
| new HashMap<Integer, DistTXCoordinatorInterface>(); |
| |
| //separate the putall op per bucket |
| for (int i=0; i<putallOp.putAllData.length; i++) { |
| assert (putallOp.putAllData[i] != null); |
| Object key = putallOp.putAllData[i].key; |
| int bucketId = putallOp.putAllData[i].getBucketId(); |
| |
| DistributedPutAllOperation putAllForBucket = |
| bucketToPutallMap.get(bucketId);; |
| if (putAllForBucket == null) { |
| EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region, |
| Operation.PUTALL_CREATE, key, |
| putallOp.putAllData[i].getValue()); |
| event.setEventId(putallOp.putAllData[i].getEventID()); |
| putAllForBucket = new DistributedPutAllOperation( |
| event, putallOp.putAllDataSize, putallOp.isBridgeOp); |
| bucketToPutallMap.put(bucketId, putAllForBucket); |
| } |
| putAllForBucket.addEntry(putallOp.putAllData[i]); |
| |
| KeyInfo ki = new KeyInfo(key, null, null); |
| DistTXCoordinatorInterface tsi = (DistTXCoordinatorInterface) getRealDeal(ki, region); |
| bucketToTxStateStubMap.put(bucketId, tsi); |
| } |
| |
| // fire a putAll operation for each bucket using appropriate TXStateStub |
| // (for target that host this bucket) |
| |
| // [DISTTX] [TODO] Perf: Can this be further optimized? |
| // This sends putAll in a loop to each target bucket (and waits for ack) |
| // one after another.Could we send respective putAll messages to all |
| // targets using same reply processor and wait on it? |
| for (Entry<Integer, DistTXCoordinatorInterface> e : bucketToTxStateStubMap |
| .entrySet()) { |
| Integer bucketId = e.getKey(); |
| DistTXCoordinatorInterface dtsi = e.getValue(); |
| DistributedPutAllOperation putAllForBucket = bucketToPutallMap |
| .get(bucketId); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.postPutAll processing" |
| + " putAll for ##bucketId = {}, ##txStateStub = {}, " |
| + "##putAllOp = {}" |
| , bucketId, dtsi, putAllForBucket); |
| } |
| dtsi.postPutAll(putAllForBucket, successfulPuts, region); |
| } |
| } |
| } |
| |
| /** |
| * For distributed transactions, this divides the user's removeAll operation |
| * into multiple per bucket removeAll ops(with entries to be removed from that |
| * bucket) and then fires those using using appropriate TXStateStub (for |
| * target that host the corresponding bucket) |
| */ |
| @Override |
| public void postRemoveAll(DistributedRemoveAllOperation op, |
| VersionedObjectList successfulOps, LocalRegion region) { |
| if (op.removeAllData.length == 0) { |
| return; |
| } |
| if (region instanceof DistributedRegion) { |
| super.postRemoveAll(op, successfulOps, region); |
| } else { |
| region.getCancelCriterion().checkCancelInProgress(null); // fix for bug |
| // #43651 |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.postRemoveAll " |
| + "processing removeAll op for region {}, size of removeAll " |
| + "is {}", region, op.removeAllDataSize); |
| } |
| |
| //map of bucketId to removeAll op for this bucket |
| HashMap<Integer, DistributedRemoveAllOperation> bucketToRemoveAllMap = |
| new HashMap<Integer, DistributedRemoveAllOperation>(); |
| //map of bucketId to TXStateStub for target that hosts this bucket |
| HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap = |
| new HashMap<Integer, DistTXCoordinatorInterface>(); |
| |
| //separate the removeAll op per bucket |
| for (int i=0; i<op.removeAllData.length; i++) { |
| assert (op.removeAllData[i] != null); |
| Object key = op.removeAllData[i].key; |
| int bucketId = op.removeAllData[i].getBucketId(); |
| |
| DistributedRemoveAllOperation removeAllForBucket = |
| bucketToRemoveAllMap.get(bucketId); |
| if (removeAllForBucket == null) { |
| EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region, |
| Operation.REMOVEALL_DESTROY, key, null); |
| removeAllForBucket = new DistributedRemoveAllOperation( |
| event, op.removeAllDataSize, op.isBridgeOp); |
| bucketToRemoveAllMap.put(bucketId, removeAllForBucket); |
| } |
| removeAllForBucket.addEntry(op.removeAllData[i]); |
| |
| KeyInfo ki = new KeyInfo(key, null, null); |
| DistTXCoordinatorInterface tsi = (DistTXCoordinatorInterface) getRealDeal(ki, region); |
| bucketToTxStateStubMap.put(bucketId, tsi); |
| } |
| |
| // fire a removeAll operation for each bucket using appropriate TXStateStub |
| // (for target that host this bucket) |
| |
| // [DISTTX] [TODO] Perf: Can this be further optimized? |
| // This sends putAll in a loop to each target bucket (and waits for ack) |
| // one after another.Could we send respective putAll messages to all |
| // targets using same reply processor and wait on it? |
| for (Entry<Integer, DistTXCoordinatorInterface> e : bucketToTxStateStubMap |
| .entrySet()) { |
| Integer bucketId = e.getKey(); |
| DistTXCoordinatorInterface dtsi = e.getValue(); |
| DistributedRemoveAllOperation removeAllForBucket = bucketToRemoveAllMap |
| .get(bucketId); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXStateProxyImplOnCoordinator.postRemoveAll processing" |
| + " removeAll for ##bucketId = {}, ##txStateStub = {}, " |
| + "##removeAllOp = {}" |
| , bucketId, dtsi, removeAllForBucket); |
| } |
| dtsi.postRemoveAll(removeAllForBucket, successfulOps, region); |
| } |
| |
| } |
| } |
| |
| @Override |
| public boolean isCreatedOnDistTxCoordinator() { |
| return true; |
| } |
| |
| public static String printEntryEventMap( |
| HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) { |
| StringBuilder str = new StringBuilder(); |
| str.append(" ("); |
| str.append(txRegionVersionsMap.size()); |
| str.append(")=[ "); |
| for (Map.Entry<String, ArrayList<DistTxThinEntryState>> entry : txRegionVersionsMap |
| .entrySet()) { |
| str.append(" {").append(entry.getKey()); |
| str.append(":").append("size(").append(entry.getValue().size()) |
| .append(")"); |
| str.append("=").append(entry.getValue()).append("}, "); |
| } |
| str.append(" } "); |
| return str.toString(); |
| } |
| |
| public static String printEntryEventList( |
| ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) { |
| StringBuilder str = new StringBuilder(); |
| str.append(" ("); |
| str.append(entryEventList.size()); |
| str.append(")=[ "); |
| for (ArrayList<DistTxThinEntryState> entry : entryEventList) { |
| str.append(" ( "); |
| str.append(entry.size()); |
| str.append(" )={").append(entry); |
| str.append(" } "); |
| } |
| str.append(" ] "); |
| return str.toString(); |
| } |
| |
| /* |
| * Do not return null |
| */ |
| public DistributedMember getOwnerForKey(LocalRegion r, KeyInfo key) { |
| DistributedMember m = r.getOwnerForKey(key); |
| if (m == null) { |
| GemFireCacheImpl cache = GemFireCacheImpl.getExisting("getOwnerForKey"); |
| m = cache.getDistributedSystem().getDistributedMember(); |
| } |
| return m; |
| } |
| } |