blob: 25f047e58dabb6e331febb625c8305a6b75e6c0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.TransactionInDoubtException;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DistTXPrecommitMessage.DistTxPrecommitResponse;
import org.apache.geode.internal.cache.TXEntryState.DistTxThinEntryState;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.DistClientTXStateStub;
import org.apache.geode.internal.cache.tx.DistTxEntryEvent;
import org.apache.geode.internal.statistics.StatisticsClock;
public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/**
* A map of distributed system member to either {@link DistPeerTXStateStub} or
* {@link DistTXStateOnCoordinator} (in case of TX coordinator is also a data node)
*/
private final HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals =
new HashMap<>();
private HashMap<InternalRegion, DistributedMember> rrTargets;
private Set<DistributedMember> txRemoteParticpants = null; // other than local
private HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null;
public DistTXStateProxyImplOnCoordinator(InternalCache cache, TXManagerImpl managerImpl, TXId id,
InternalDistributedMember clientMember, StatisticsClock statisticsClock) {
super(cache, managerImpl, id, clientMember, statisticsClock);
}
public DistTXStateProxyImplOnCoordinator(InternalCache cache, TXManagerImpl managerImpl, TXId id,
boolean isjta, StatisticsClock statisticsClock) {
super(cache, managerImpl, id, isjta, statisticsClock);
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.internal.cache.TXStateInterface#commit()
*
* [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure these messages reach
* all
*/
@Override
public void commit() throws CommitConflictException {
boolean preserveTx = false;
boolean precommitResult = false;
try {
// create a map of secondary(for PR) / replica(for RR) to stubs to send
// commit message to those
HashMap<DistributedMember, DistTXCoordinatorInterface> otherTargets2realDeals =
getSecondariesAndReplicasForTxOps();
// add it to the existing map and then send commit to all copies
target2realDeals.putAll(otherTargets2realDeals);
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.commit target2realDeals = " + target2realDeals);
}
precommitResult = doPrecommit();
if (precommitResult) {
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.commit Going for commit ");
}
boolean phase2commitDone = doCommit();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.commit Commit "
+ (phase2commitDone ? "Done" : "Failed"));
}
if (!phase2commitDone) {
throw new TransactionInDoubtException(
"Commit failed on cache server");
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.commit precommitResult = " + precommitResult);
}
}
} catch (UnsupportedOperationInTransactionException e) {
// fix for #42490
preserveTx = true;
throw e;
} finally {
if (!precommitResult) {
rollback();
}
inProgress = preserveTx;
}
}
/**
* creates a map of all secondaries(for PR) / replicas(for RR) to stubs to send commit message to
* those
*/
private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() {
InternalDistributedMember currentNode =
getCache().getInternalDistributedSystem().getDistributedMember();
HashMap<DistributedMember, DistTXCoordinatorInterface> secondaryTarget2realDeals =
new HashMap<>();
for (Entry<DistributedMember, DistTXCoordinatorInterface> e : target2realDeals.entrySet()) {
DistributedMember originalTarget = e.getKey();
DistTXCoordinatorInterface distPeerTxStateStub = e.getValue();
ArrayList<DistTxEntryEvent> primaryTxOps =
distPeerTxStateStub.getPrimaryTransactionalOperations();
for (DistTxEntryEvent dtop : primaryTxOps) {
InternalRegion internalRegion = dtop.getRegion();
// replicas or secondaries
Set<InternalDistributedMember> otherNodes = null;
if (internalRegion instanceof PartitionedRegion) {
Set<InternalDistributedMember> allNodes = ((PartitionedRegion) dtop.getRegion())
.getRegionAdvisor().getBucketOwners(dtop.getKeyInfo().getBucketId());
allNodes.remove(originalTarget);
otherNodes = allNodes;
} else if (internalRegion instanceof DistributedRegion) {
otherNodes = ((DistributedRegion) internalRegion).getCacheDistributionAdvisor()
.adviseInitializedReplicates();
otherNodes.remove(originalTarget);
}
if (otherNodes != null) {
for (InternalDistributedMember dm : otherNodes) {
// whether the target already exists due to other Tx op on the node
DistTXCoordinatorInterface existingDistPeerTXStateStub = target2realDeals.get(dm);
if (existingDistPeerTXStateStub == null) {
existingDistPeerTXStateStub = secondaryTarget2realDeals.get(dm);
if (existingDistPeerTXStateStub == null) {
DistTXCoordinatorInterface newTxStub = null;
if (currentNode.equals(dm)) {
// [DISTTX] TODO add a test case for this condition?
newTxStub = new DistTXStateOnCoordinator(this, false, getStatisticsClock());
} else {
newTxStub = new DistPeerTXStateStub(this, dm, onBehalfOfClientMember);
}
newTxStub.addSecondaryTransactionalOperations(dtop);
secondaryTarget2realDeals.put(dm, newTxStub);
} else {
existingDistPeerTXStateStub.addSecondaryTransactionalOperations(dtop);
}
} else {
existingDistPeerTXStateStub.addSecondaryTransactionalOperations(dtop);
}
}
}
}
}
return secondaryTarget2realDeals;
}
@Override
public void rollback() {
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.rollback Going for rollback ");
}
boolean finalResult = false;
final DistributionManager dm = getCache().getDistributionManager();
try {
// Create Tx Participants
Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
// create processor and rollback message
DistTXRollbackMessage.DistTxRollbackReplyProcessor processor =
new DistTXRollbackMessage.DistTxRollbackReplyProcessor(this.getTxId(), dm,
txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXRollbackMessage rollbackMsg =
new DistTXRollbackMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send rollback message to remote nodes
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"DistPeerTXStateStub",
remoteTXStateStub.getClass().getSimpleName()));
}
try {
remoteTXStateStub.setRollbackMessage(rollbackMsg, dm);
remoteTXStateStub.rollback();
} finally {
remoteTXStateStub.setRollbackMessage(null, null);
remoteTXStateStub.finalCleanup();
}
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = " + remoteNode);
}
}
// Do rollback on local node
DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId());
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"DistTXStateOnCoordinator",
localTXState.getClass().getSimpleName()));
}
localTXState.rollback();
boolean localResult = localTXState.getRollbackResponse();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.rollback local = " + dm.getId()
+ " ,result= " + localResult + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && localResult;
}
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualSend != null) {
// internalAfterIndividualSend.run();
// }
/*
* [DISTTX] TODO see how to handle exception
*/
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualCommitProcess != null) {
// // Testing callback
// internalAfterIndividualCommitProcess.run();
// }
{ // Wait for results
dm.getCancelCriterion().checkCancelInProgress(null);
processor.waitForPrecommitCompletion();
// [DISTTX} TODO Handle stats
// dm.getStats().incCommitWaits();
Map<DistributedMember, Boolean> remoteResults = processor.getRollbackResponseMap();
for (Entry<DistributedMember, Boolean> e : remoteResults.entrySet()) {
DistributedMember target = e.getKey();
Boolean remoteResult = e.getValue();
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = " + target
+ " ,result= " + remoteResult + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResult;
}
}
} finally {
inProgress = false;
}
/*
* [DISTTX] TODO Write similar method to take out exception
*
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
// checkDistributionReliability(distMap, processor);
// }
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.rollback finalResult= " + finalResult);
}
}
/**
* {@inheritDoc}
*/
@Override
public TXStateInterface getRealDeal(KeyInfo key, InternalRegion r) {
if (r != null) {
target = null;
// wait for the region to be initialized fixes bug 44652
r.waitOnInitialization(r.getInitializationLatchBeforeGetInitialImage());
if (r instanceof PartitionedRegion) {
target = getOwnerForKey(r, key);
} else if (r instanceof BucketRegion) {
target = ((BucketRegion) r).getBucketAdvisor().getPrimary();
// target = r.getMyId();
} else { // replicated region
target = getRRTarget(key, r);
}
this.realDeal = target2realDeals.get(target);
}
if (this.realDeal == null) {
// assert (r != null);
if (r == null) { // TODO: stop gap to get tests working
this.realDeal = new DistTXStateOnCoordinator(this, false, getStatisticsClock());
target = this.txMgr.getDM().getId();
} else {
// Code to keep going forward
if (r.hasServerProxy()) {
// TODO [DISTTX] See what we need for client?
this.realDeal =
new DistClientTXStateStub(r.getCache(), r.getDistributionManager(), this, target, r);
if (r.getScope().isDistributed()) {
if (txDistributedClientWarningIssued.compareAndSet(false, true)) {
logger.warn(
"Distributed region {} is being used in a client-initiated transaction. The transaction will only affect servers and this client. To keep from seeing this message use 'local' scope in client regions used in transactions.",
r.getFullPath());
}
}
} else {
// (r != null) code block above
if (target == null || target.equals(this.txMgr.getDM().getId())) {
this.realDeal = new DistTXStateOnCoordinator(this, false, getStatisticsClock());
} else {
this.realDeal = new DistPeerTXStateStub(this, target, onBehalfOfClientMember);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
this.realDeal, this.txMgr.getDM().getId(), this, target/* , new Throwable() */);
}
target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal);
if (logger.isDebugEnabled()) {
logger
.debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
+ target2realDeals);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
this.realDeal, this, target, target2realDeals);
}
}
return this.realDeal;
}
@Override
public TXStateInterface getRealDeal(DistributedMember t) {
assert t != null;
this.realDeal = target2realDeals.get(target);
if (this.realDeal == null) {
this.target = t;
this.realDeal = new DistPeerTXStateStub(this, target, onBehalfOfClientMember);
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
this.realDeal, this.txMgr.getDM().getId());
}
if (!this.realDeal.isDistTx() || this.realDeal.isCreatedOnDistTxCoordinator()
|| !this.realDeal.isTxState()) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"DistPeerTXStateStub", this.realDeal.getClass().getSimpleName()));
}
target2realDeals.put(target, (DistPeerTXStateStub) realDeal);
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
+ target2realDeals);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
this.realDeal, this, target, target2realDeals);
}
}
return this.realDeal;
}
/*
* [DISTTX] TODO Do some optimization
*/
private DistributedMember getRRTarget(KeyInfo key, InternalRegion r) {
if (this.rrTargets == null) {
this.rrTargets = new HashMap();
}
DistributedMember m = this.rrTargets.get(r);
if (m == null) {
m = getOwnerForKey(r, key);
this.rrTargets.put(r, m);
}
return m;
}
private Set<DistributedMember> getTxRemoteParticpants(final DistributionManager dm) {
if (this.txRemoteParticpants == null) {
Set<DistributedMember> txParticpants = target2realDeals.keySet();
this.txRemoteParticpants = new HashSet<DistributedMember>(txParticpants);
// Remove local member from remote participant list
this.txRemoteParticpants.remove(dm.getId());
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit txParticpants = "
+ txParticpants + " ,txRemoteParticpants=" + this.txRemoteParticpants + " ,originator="
+ dm.getId());
}
}
return txRemoteParticpants;
}
private boolean doPrecommit() {
boolean finalResult = true;
final DistributionManager dm = getCache().getDistributionManager();
Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
// create processor and precommit message
DistTXPrecommitMessage.DistTxPrecommitReplyProcessor processor =
new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(this.getTxId(), dm,
txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXPrecommitMessage precommitMsg =
new DistTXPrecommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send precommit message to remote nodes
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"DistPeerTXStateStub",
remoteTXStateStub.getClass().getSimpleName()));
}
try {
remoteTXStateStub.setPrecommitMessage(precommitMsg, dm);
remoteTXStateStub.precommit();
} finally {
remoteTXStateStub.setPrecommitMessage(null, null);
}
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " + remoteNode);
}
}
// Do precommit on local node
TreeSet<String> sortedRegionName = new TreeSet<>();
DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId());
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"DistTXStateOnCoordinator",
localTXState.getClass().getSimpleName()));
}
localTXState.precommit();
boolean localResult = localTXState.getPreCommitResponse();
TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap =
new TreeMap<String, ArrayList<DistTxThinEntryState>>();
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null;
if (localResult) {
localResult = ((DistTXStateOnCoordinator) localTXState)
.populateDistTxEntryStateList(entryStateSortedMap);
if (localResult) {
entryEventList =
new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values());
populateEntryEventMap(dm.getId(), entryEventList, sortedRegionName);
}
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit local = " + dm.getId()
+ " ,entryEventList=" + printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
+ printEntryEventMap(this.txEntryEventMap) + " ,result= " + localResult
+ " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && localResult;
}
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualSend != null) {
// internalAfterIndividualSend.run();
// }
/*
* [DISTTX] TODO see how to handle exception
*/
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualCommitProcess != null) {
// // Testing callback
// internalAfterIndividualCommitProcess.run();
// }
{ // Wait for results
dm.getCancelCriterion().checkCancelInProgress(null);
processor.waitForPrecommitCompletion();
// [DISTTX} TODO Handle stats
// dm.getStats().incCommitWaits();
Map<DistributedMember, DistTxPrecommitResponse> remoteResults =
processor.getCommitResponseMap();
for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults.entrySet()) {
DistributedMember target = e.getKey();
DistTxPrecommitResponse remoteResponse = e.getValue();
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList =
remoteResponse.getDistTxEntryEventList();
populateEntryEventMap(target, entryEventList, sortedRegionName);
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit got reply from target = "
+ target + " ,sortedRegions" + sortedRegionName + " ,entryEventList="
+ printEntryEventList(entryEventList) + " ,txEntryEventMap="
+ printEntryEventMap(this.txEntryEventMap) + " ,result= "
+ remoteResponse.getCommitState() + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResponse.getCommitState();
}
}
/*
* [DISTTX] TODO Write similar method to take out exception
*
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
// checkDistributionReliability(distMap, processor);
// }
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit finalResult= " + finalResult);
}
return finalResult;
}
/*
* Handle response of precommit reply
*
* Go over list of region versions for this target and fill map
*/
private void populateEntryEventMap(DistributedMember target,
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionName) {
if (this.txEntryEventMap == null) {
this.txEntryEventMap = new HashMap<String, ArrayList<DistTxThinEntryState>>();
}
DistTXCoordinatorInterface distTxIface = target2realDeals.get(target);
if (distTxIface.getPrimaryTransactionalOperations() != null
&& distTxIface.getPrimaryTransactionalOperations().size() > 0) {
sortedRegionName.clear();
distTxIface.gatherAffectedRegionsName(sortedRegionName, true, false);
if (sortedRegionName.size() != entryEventList.size()) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"size of " + sortedRegionName.size() + " {" + sortedRegionName + "}"
+ " for target=" + target,
entryEventList.size() + " {" + entryEventList + "}"));
}
int index = 0;
// Get region as per sorted order of region path
for (String rName : sortedRegionName) {
txEntryEventMap.put(rName, entryEventList.get(index++));
}
}
}
/*
* Populate list of regions for this target, while sending commit messages
*/
private void populateEntryEventList(DistributedMember target,
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionMap) {
DistTXCoordinatorInterface distTxItem = target2realDeals.get(target);
sortedRegionMap.clear();
distTxItem.gatherAffectedRegionsName(sortedRegionMap, false, true);
// Get region as per sorted order of region path
entryEventList.clear();
for (String rName : sortedRegionMap) {
ArrayList<DistTxThinEntryState> entryStates = this.txEntryEventMap.get(rName);
if (entryStates == null) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"entryStates for " + rName + " at target " + target, "null"));
}
entryEventList.add(entryStates);
}
}
/*
* [DISTTX] TODO - Handle result TXMessage
*/
private boolean doCommit() {
boolean finalResult = true;
final DistributionManager dm = getCache().getDistributionManager();
// Create Tx Participants
Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
// create processor and commit message
DistTXCommitMessage.DistTxCommitReplyProcessor processor =
new DistTXCommitMessage.DistTxCommitReplyProcessor(this.getTxId(), dm, txRemoteParticpants,
target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXCommitMessage commitMsg =
new DistTXCommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send commit message to remote nodes
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = new ArrayList<>();
TreeSet<String> sortedRegionName = new TreeSet<>();
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"DistPeerTXStateStub",
remoteTXStateStub.getClass().getSimpleName()));
}
try {
populateEntryEventList(remoteNode, entryEventList, sortedRegionName);
commitMsg.setEntryStateList(entryEventList);
remoteTXStateStub.setCommitMessage(commitMsg, dm);
remoteTXStateStub.commit();
} finally {
remoteTXStateStub.setCommitMessage(null, null);
remoteTXStateStub.finalCleanup();
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doCommit Sent Message target = "
+ remoteNode + " ,sortedRegions=" + sortedRegionName + " ,entryEventList="
+ printEntryEventList(entryEventList) + " ,txEntryEventMap="
+ printEntryEventMap(this.txEntryEventMap));
}
}
// Do commit on local node
DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId());
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"DistTXStateOnCoordinator",
localTXState.getClass().getSimpleName()));
}
populateEntryEventList(dm.getId(), entryEventList, sortedRegionName);
((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList);
localTXState.commit();
TXCommitMessage localResultMsg = localTXState.getCommitMessage();
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.doCommit local = " + dm.getId() + " ,sortedRegions="
+ sortedRegionName + " ,entryEventList=" + printEntryEventList(entryEventList)
+ " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) + " ,result= "
+ (localResultMsg != null) + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && (localResultMsg != null);
}
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualSend != null) {
// internalAfterIndividualSend.run();
// }
/*
* [DISTTX] TODO see how to handle exception
*/
/*
* [DISTTX] TODO Any test hooks
*/
// if (internalAfterIndividualCommitProcess != null) {
// // Testing callback
// internalAfterIndividualCommitProcess.run();
// }
{ // Wait for results
dm.getCancelCriterion().checkCancelInProgress(null);
processor.waitForPrecommitCompletion();
// [DISTTX} TODO Handle stats
dm.getStats().incCommitWaits();
Map<DistributedMember, TXCommitMessage> remoteResults = processor.getCommitResponseMap();
for (Entry<DistributedMember, TXCommitMessage> e : remoteResults.entrySet()) {
DistributedMember target = e.getKey();
TXCommitMessage remoteResultMsg = e.getValue();
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger.debug(
"DistTXStateProxyImplOnCoordinator.doCommit got results from target = " + target
+ " ,result= " + (remoteResultMsg != null) + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResultMsg != null;
}
}
/*
* [DISTTX] TODO Write similar method to take out exception
*
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
// checkDistributionReliability(distMap, processor);
// }
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doCommit finalResult= " + finalResult);
}
return finalResult;
}
/**
* For distributed transactions, this divides the user's putAll operation into multiple per bucket
* putAll ops(with entries to be put in that bucket) and then fires those using using appropriate
* TXStateStub (for target that host the corresponding bucket)
*/
@Override
public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,
InternalRegion reg) {
if (putallOp.putAllData.length == 0) {
return;
}
if (reg instanceof DistributedRegion) {
super.postPutAll(putallOp, successfulPuts, reg);
} else {
reg.getCancelCriterion().checkCancelInProgress(null); // fix for bug
// #43651
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.postPutAll "
+ "processing putAll op for region {}, size of putAllOp " + "is {}",
reg, putallOp.putAllData.length);
}
// map of bucketId to putall op for this bucket
HashMap<Integer, DistributedPutAllOperation> bucketToPutallMap =
new HashMap<Integer, DistributedPutAllOperation>();
// map of bucketId to TXStateStub for target that hosts this bucket
HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
new HashMap<Integer, DistTXCoordinatorInterface>();
// separate the putall op per bucket
for (int i = 0; i < putallOp.putAllData.length; i++) {
assert (putallOp.putAllData[i] != null);
Object key = putallOp.putAllData[i].key;
int bucketId = putallOp.putAllData[i].getBucketId();
DistributedPutAllOperation putAllForBucket = bucketToPutallMap.get(bucketId);;
if (putAllForBucket == null) {
// TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, reg,
Operation.PUTALL_CREATE, key, putallOp.putAllData[i].getValue(reg.getCache()));
event.setEventId(putallOp.putAllData[i].getEventID());
putAllForBucket =
new DistributedPutAllOperation(event, putallOp.putAllDataSize, putallOp.isBridgeOp);
bucketToPutallMap.put(bucketId, putAllForBucket);
}
putallOp.putAllData[i].setFakeEventID();
putAllForBucket.addEntry(putallOp.putAllData[i]);
KeyInfo ki = new KeyInfo(key, null, null);
DistTXCoordinatorInterface tsi = (DistTXCoordinatorInterface) getRealDeal(ki, reg);
bucketToTxStateStubMap.put(bucketId, tsi);
}
// fire a putAll operation for each bucket using appropriate TXStateStub
// (for target that host this bucket)
// [DISTTX] [TODO] Perf: Can this be further optimized?
// This sends putAll in a loop to each target bucket (and waits for ack)
// one after another.Could we send respective putAll messages to all
// targets using same reply processor and wait on it?
for (Entry<Integer, DistTXCoordinatorInterface> e : bucketToTxStateStubMap.entrySet()) {
Integer bucketId = e.getKey();
DistTXCoordinatorInterface dtsi = e.getValue();
DistributedPutAllOperation putAllForBucket = bucketToPutallMap.get(bucketId);
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.postPutAll processing"
+ " putAll for ##bucketId = {}, ##txStateStub = {}, " + "##putAllOp = {}",
bucketId, dtsi, putAllForBucket);
}
dtsi.postPutAll(putAllForBucket, successfulPuts, reg);
}
}
}
/**
* For distributed transactions, this divides the user's removeAll operation into multiple per
* bucket removeAll ops(with entries to be removed from that bucket) and then fires those using
* using appropriate TXStateStub (for target that host the corresponding bucket)
*/
@Override
public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps,
InternalRegion reg) {
if (op.removeAllData.length == 0) {
return;
}
if (reg instanceof DistributedRegion) {
super.postRemoveAll(op, successfulOps, reg);
} else {
reg.getCancelCriterion().checkCancelInProgress(null); // fix for bug
// #43651
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.postRemoveAll "
+ "processing removeAll op for region {}, size of removeAll " + "is {}",
reg, op.removeAllDataSize);
}
// map of bucketId to removeAll op for this bucket
HashMap<Integer, DistributedRemoveAllOperation> bucketToRemoveAllMap =
new HashMap<Integer, DistributedRemoveAllOperation>();
// map of bucketId to TXStateStub for target that hosts this bucket
HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
new HashMap<Integer, DistTXCoordinatorInterface>();
// separate the removeAll op per bucket
for (int i = 0; i < op.removeAllData.length; i++) {
assert (op.removeAllData[i] != null);
Object key = op.removeAllData[i].key;
int bucketId = op.removeAllData[i].getBucketId();
DistributedRemoveAllOperation removeAllForBucket = bucketToRemoveAllMap.get(bucketId);
if (removeAllForBucket == null) {
// TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(op, reg, key);
event.setEventId(op.removeAllData[i].getEventID());
removeAllForBucket =
new DistributedRemoveAllOperation(event, op.removeAllDataSize, op.isBridgeOp);
bucketToRemoveAllMap.put(bucketId, removeAllForBucket);
}
op.removeAllData[i].setFakeEventID();
removeAllForBucket.addEntry(op.removeAllData[i]);
KeyInfo ki = new KeyInfo(key, null, null);
DistTXCoordinatorInterface tsi = (DistTXCoordinatorInterface) getRealDeal(ki, reg);
bucketToTxStateStubMap.put(bucketId, tsi);
}
// fire a removeAll operation for each bucket using appropriate TXStateStub
// (for target that host this bucket)
// [DISTTX] [TODO] Perf: Can this be further optimized?
// This sends putAll in a loop to each target bucket (and waits for ack)
// one after another.Could we send respective putAll messages to all
// targets using same reply processor and wait on it?
for (Entry<Integer, DistTXCoordinatorInterface> e : bucketToTxStateStubMap.entrySet()) {
Integer bucketId = e.getKey();
DistTXCoordinatorInterface dtsi = e.getValue();
DistributedRemoveAllOperation removeAllForBucket = bucketToRemoveAllMap.get(bucketId);
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
+ " removeAll for ##bucketId = {}, ##txStateStub = {}, " + "##removeAllOp = {}",
bucketId, dtsi, removeAllForBucket);
}
dtsi.postRemoveAll(removeAllForBucket, successfulOps, reg);
}
}
}
@Override
public boolean isCreatedOnDistTxCoordinator() {
return true;
}
public static String printEntryEventMap(
HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
StringBuilder str = new StringBuilder();
str.append(" (");
str.append(txRegionVersionsMap.size());
str.append(")=[ ");
for (Map.Entry<String, ArrayList<DistTxThinEntryState>> entry : txRegionVersionsMap
.entrySet()) {
str.append(" {").append(entry.getKey());
str.append(":").append("size(").append(entry.getValue().size()).append(")");
str.append("=").append(entry.getValue()).append("}, ");
}
str.append(" } ");
return str.toString();
}
public static String printEntryEventList(
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
StringBuilder str = new StringBuilder();
str.append(" (");
str.append(entryEventList.size());
str.append(")=[ ");
for (ArrayList<DistTxThinEntryState> entry : entryEventList) {
str.append(" ( ");
str.append(entry.size());
str.append(" )={").append(entry);
str.append(" } ");
}
str.append(" ] ");
return str.toString();
}
/*
* Do not return null
*/
public DistributedMember getOwnerForKey(InternalRegion r, KeyInfo key) {
DistributedMember m = r.getOwnerForKey(key);
if (m == null) {
m = getCache().getDistributedSystem().getDistributedMember();
}
return m;
}
}