blob: 15e0a903e540767698d78c0af7a4ce7c2f55ab18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionWriter;
import org.apache.geode.cache.TransactionWriterException;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXEntryState.DistTxThinEntryState;
import org.apache.geode.internal.cache.partitioned.PutAllPRMessage;
import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.DistTxEntryEvent;
import org.apache.geode.internal.cache.tx.DistTxKeyInfo;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.statistics.StatisticsClock;
/**
* TxState on a data node VM
*
*
*/
public class DistTXState extends TXState {
@MutableForTesting
public static Runnable internalBeforeApplyChanges; // TODO: cleanup this test hook
@MutableForTesting
public static Runnable internalBeforeNonTXBasicPut; // TODO: cleanup this test hook
private boolean updatingTxStateDuringPreCommit = false;
public DistTXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub,
StatisticsClock statisticsClock) {
super(proxy, onBehalfOfRemoteStub, statisticsClock);
}
@Override
protected void cleanup() {
super.cleanup();
// Do nothing for now
}
/*
* If this is a primary member, for each entry in TXState, generate next region version and store
* in the entry.
*/
public void updateRegionVersions() {
Iterator<Map.Entry<InternalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<InternalRegion, TXRegionState> me = it.next();
InternalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
// Generate next region version only on the primary
if (!txrs.isCreatedDuringCommit()) {
try {
Set entries = txrs.getEntryKeys();
if (!entries.isEmpty()) {
Iterator entryIt = entries.iterator();
while (entryIt.hasNext()) {
Object key = entryIt.next();
TXEntryState txes = txrs.getTXEntryState(key);
RegionVersionVector rvv = r.getVersionVector();
if (rvv != null) {
long v = rvv.getNextVersion();
// txes.setNextRegionVersion(v);
txes.getDistTxEntryStates().setRegionVersion(v);
if (logger.isDebugEnabled()) {
logger.debug("Set next region version to " + v + " for region=" + r.getName()
+ "in TXEntryState for key" + key);
}
}
}
}
} catch (DiskAccessException dae) {
r.handleDiskAccessException(dae);
throw dae;
}
}
}
}
/*
* Iterate through all changes and for those changes for which this member hosts a primary bucket,
* generate a tail key and store in the TXEntryState. From there it is expected to be carried over
* to the secondaries in phase-2 commit. In phase-2 commit, the both the primary and secondaries
* should use this tail key to enqueue into parallel queues.
*/
public void generateTailKeysForParallelDispatcherEvents() {
Iterator<Map.Entry<InternalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<InternalRegion, TXRegionState> me = it.next();
InternalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
InternalRegion region = txrs.getRegion();
// Check if it is a bucket region
if (region.isUsedForPartitionedRegionBucket()) {
// Check if it is a primary bucket
BucketRegion bRegion = (BucketRegion) region;
if (!(bRegion instanceof AbstractBucketRegionQueue)) {
if (bRegion.getBucketAdvisor().isPrimary()) {
// Generate a tail key for each entry
Set entries = txrs.getEntryKeys();
if (!entries.isEmpty()) {
Iterator entryIt = entries.iterator();
while (entryIt.hasNext()) {
Object key = entryIt.next();
TXEntryState txes = txrs.getTXEntryState(key);
long tailKey = ((BucketRegion) region).generateTailKey();
txes.getDistTxEntryStates().setTailKey(tailKey);
}
}
} // end if primary
}
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.internal.cache.TXStateInterface#commit()
*
* Take Locks Does conflict check on primary ([DISTTX] TODO on primary only) Invoke TxWriter
*/
@Override
public void precommit()
throws CommitConflictException, UnsupportedOperationInTransactionException {
if (logger.isDebugEnabled()) {
logger.debug("DistTXState.precommit transaction {} is closed {} ", getTransactionId(),
this.closed/* , new Throwable() */);
}
if (this.closed) {
return;
}
synchronized (this.completionGuard) {
this.completionStarted = true;
}
if (onBehalfOfRemoteStub && !proxy.isCommitOnBehalfOfRemoteStub()) {
throw new UnsupportedOperationInTransactionException(
"Cannot commit a transaction being run on behalf of a remote thread");
}
cleanupNonDirtyRegions();
/*
* Lock buckets so they can't be rebalanced then perform the conflict check to fix #43489
*/
try {
lockBucketRegions();
} catch (PrimaryBucketException pbe) {
// not sure what to do here yet
RuntimeException re = new TransactionDataRebalancedException(
"Transactional data moved, due to rebalancing.");
re.initCause(pbe);
throw re;
}
if (this.locks == null) {
reserveAndCheck();
}
// For internal testing
if (this.internalAfterConflictCheck != null) {
this.internalAfterConflictCheck.run();
}
updateRegionVersions();
generateTailKeysForParallelDispatcherEvents();
/*
* If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort the
* transaction.
*/
TransactionWriter writer = this.proxy.getTxMgr().getWriter();
if (!firedWriter && writer != null) {
try {
firedWriter = true;
writer.beforeCommit(getEvent());
} catch (TransactionWriterException twe) {
cleanup();
throw new CommitConflictException(twe);
} catch (VirtualMachineError err) {
// cleanup(); this allocates objects so I don't think we can do it -
// that leaves the TX open, but we are poison pilling so we should be
// ok??
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
cleanup(); // rollback the transaction!
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
throw new CommitConflictException(t);
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.internal.cache.TXStateInterface#commit()
*
* Apply changes release locks
*/
@Override
public void commit() throws CommitConflictException {
if (logger.isDebugEnabled()) {
logger.debug("DistTXState.commit transaction {} is closed {} ", getTransactionId(),
this.closed/* , new Throwable() */);
}
if (this.closed) {
return;
}
try {
List/* <TXEntryStateWithRegionAndKey> */ entries = generateEventOffsets();
if (logger.isDebugEnabled()) {
logger.debug("commit entries " + entries);
}
TXCommitMessage msg = null;
try {
attachFilterProfileInformation(entries);
if (internalBeforeApplyChanges != null) {
internalBeforeApplyChanges.run();
}
// apply changes to the cache
applyChanges(entries);
// For internal testing
if (this.internalAfterApplyChanges != null) {
this.internalAfterApplyChanges.run();
}
// [DISTTX]TODO:
// Build a message specifically for those nodes who
// hold gateway senders and listeners but not a copy of the buckets
// on which changes in this tx are done.
// This is applicable only for partitioned regions and
// serial gateway senders.
// This works only if the coordinator and sender are not the same node.
// For same sender as coordinator, this results in a hang, which needs to be addressed.
// If an another method of notifying adjunct receivers is implemented,
// the following two lines should be commented out.
msg = buildMessageForAdjunctReceivers();
msg.send(this.locks.getDistributedLockId());
// Fire callbacks collected in the local txApply* executions
firePendingCallbacks();
this.commitMessage = buildCompleteMessage();
} finally {
if (msg != null) {
msg.releaseViewVersions();
}
this.locks.releaseLocal();
// For internal testing
if (this.internalAfterReleaseLocalLocks != null) {
this.internalAfterReleaseLocalLocks.run();
}
}
} finally {
cleanup();
}
}
/**
* this builds a new DistTXAdjunctCommitMessage and returns it
*
* @return the new message
*/
protected TXCommitMessage buildMessageForAdjunctReceivers() {
TXCommitMessage msg =
new DistTXAdjunctCommitMessage(this.proxy.getTxId(), this.proxy.getTxMgr().getDM(), this);
Iterator<Map.Entry<InternalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<InternalRegion, TXRegionState> me = it.next();
InternalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
// only on the primary
if (r.isUsedForPartitionedRegionBucket() && !txrs.isCreatedDuringCommit()) {
txrs.buildMessageForAdjunctReceivers(r, msg);
}
}
return msg;
}
@Override
public void rollback() {
super.rollback();
// Cleanup is called next
}
protected boolean applyOpsOnRedundantCopy(DistributedMember sender,
ArrayList<DistTxEntryEvent> secondaryTransactionalOperations) {
boolean returnValue = true;
try {
boolean result = true;
// Start TxState Update During PreCommit phase
setUpdatingTxStateDuringPreCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("DistTXState.applyOpOnRedundantCopy: size of "
+ "secondaryTransactionalOperations = {}", secondaryTransactionalOperations.size());
}
/*
* Handle Put Operations meant for secondary.
*
* @see org.apache.geode.internal.cache.partitioned.PutMessage.
* operateOnPartitionedRegion(DistributionManager, PartitionedRegion, long)
*
* [DISTTX] TODO need to handle other operations
*/
for (DistTxEntryEvent dtop : secondaryTransactionalOperations) {
if (logger.isDebugEnabled()) {
logger.debug("DistTXState.applyOpOnRedundantCopy: processing dist " + "tx operation {}",
dtop);
}
dtop.setDistributedMember(sender);
dtop.setOriginRemote(false);
/*
* [DISTTX} TODO handle call back argument version tag and other settings in PutMessage
*/
String failureReason = null;
try {
if (dtop.getRegion() == null) {
// Tx event from the peer.
if (dtop.getRegionName() == null) {
throw new InternalGemFireException("Region is unavailable on DistTxEntryEvent.");
}
dtop.setRegion((LocalRegion) getCache().getRegion(dtop.getRegionName()));
}
if (dtop.getKeyInfo().isDistKeyInfo()) {
dtop.getKeyInfo().setCheckPrimary(false);
} else {
dtop.setKeyInfo(new DistTxKeyInfo(dtop.getKeyInfo()));
dtop.getKeyInfo().setCheckPrimary(false);
}
// apply the op
result = applyIndividualOp(dtop);
if (!result) { // make sure the region hasn't gone away
dtop.getRegion().checkReadiness();
}
} catch (CacheWriterException cwe) {
result = false;
failureReason = "CacheWriterException";
} catch (PrimaryBucketException pbe) {
result = false;
failureReason = "PrimaryBucketException";
} catch (InvalidDeltaException ide) {
result = false;
failureReason = "InvalidDeltaException";
} catch (DataLocationException e) {
result = false;
failureReason = "DataLocationException";
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXState.applyOpOnRedundantCopy {} ##op {}, " + "##region {}, ##key {}",
(result ? " sucessfully applied op " : " failed to apply op due to " + failureReason),
dtop.getOperation(), dtop.getRegion().getName(), dtop.getKey());
}
if (!result) {
returnValue = false;
break;
}
}
} finally {
// End TxState Update During PreCommit phase
setUpdatingTxStateDuringPreCommit(false);
}
return returnValue;
}
/**
* Apply the individual tx op on secondary
*
* Calls local function such as putEntry instead of putEntryOnRemote as for this
* {@link DistTXStateOnCoordinator} as events will always be local. In parent {@link DistTXState}
* class will call remote version of functions
*
*/
protected boolean applyIndividualOp(DistTxEntryEvent dtop) throws DataLocationException {
boolean result = true;
if (dtop.op.isUpdate() || dtop.op.isCreate()) {
if (dtop.op.isPutAll()) {
assert (dtop.getPutAllOperation() != null);
// [DISTTX] TODO what do with versions next?
final VersionedObjectList versions =
new VersionedObjectList(dtop.getPutAllOperation().putAllDataSize, true,
dtop.getRegion().getConcurrencyChecksEnabled());
postPutAll(dtop.getPutAllOperation(), versions, dtop.getRegion());
} else {
result = putEntryOnRemote(dtop, false/* ifNew */, false/* ifOld */,
null/* expectedOldValue */, false/* requireOldValue */, 0L/* lastModified */,
true/*
* overwriteDestroyed *not* used
*/);
}
} else if (dtop.op.isDestroy()) {
if (dtop.op.isRemoveAll()) {
assert (dtop.getRemoveAllOperation() != null);
// [DISTTX] TODO what do with versions next?
final VersionedObjectList versions =
new VersionedObjectList(dtop.getRemoveAllOperation().removeAllDataSize, true,
dtop.getRegion().getConcurrencyChecksEnabled());
postRemoveAll(dtop.getRemoveAllOperation(), versions, dtop.getRegion());
} else {
destroyOnRemote(dtop, false/* TODO [DISTTX] */, null/*
* TODO [DISTTX]
*/);
}
} else if (dtop.op.isInvalidate()) {
invalidateOnRemote(dtop, true/* TODO [DISTTX] */, false/*
* TODO [DISTTX]
*/);
} else {
logger.debug("DistTXCommitPhaseOneMessage: unsupported TX operation {}", dtop);
assert (false);
}
return result;
}
public boolean isUpdatingTxStateDuringPreCommit() {
return updatingTxStateDuringPreCommit;
}
/**
* For Dist Tx
*
* @param updatingTxState if updating TxState during Commit Phase
*/
private void setUpdatingTxStateDuringPreCommit(boolean updatingTxState)
throws UnsupportedOperationInTransactionException {
this.updatingTxStateDuringPreCommit = updatingTxState;
if (logger.isDebugEnabled()) {
logger.debug("DistTXState setUpdatingTxStateDuringPreCommit incoming {} final {} ",
updatingTxState, this.updatingTxStateDuringPreCommit);
}
}
@Override
public TXRegionState writeRegion(InternalRegion r) {
TXRegionState result = readRegion(r);
if (result == null) {
if (r instanceof BucketRegion) {
result = new TXBucketRegionState((BucketRegion) r, this);
} else {
result = new TXRegionState(r, this);
}
result.setCreatedDuringCommit(this.updatingTxStateDuringPreCommit);
this.regions.put(r, result);
if (logger.isDebugEnabled()) {
logger.debug("DistTXState writeRegion flag {} new region-state {} ",
this.updatingTxStateDuringPreCommit, result);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("DistTXState writeRegion flag {} region-state {} ",
this.updatingTxStateDuringPreCommit, result);
}
}
return result;
}
/*
* [DISTTX] Note: This has been overridden here to associate DistKeyInfo with event to disable
* primary check(see DistKeyInfo.setCheckPrimary(false)) when this gets called on secondary of a
* PR
*
* For TX this needs to be a PR passed in as region
*
*
* @see org.apache.geode.internal.cache.InternalDataView#postPutAll(org.apache
* .gemfire.internal.cache.DistributedPutAllOperation, java.util.Map,
* org.apache.geode.internal.cache.LocalRegion)
*/
@Override
public void postPutAll(final DistributedPutAllOperation putallOp,
final VersionedObjectList successfulPuts, InternalRegion reg) {
final InternalRegion theRegion;
if (reg instanceof BucketRegion) {
theRegion = ((BucketRegion) reg).getPartitionedRegion();
} else {
theRegion = reg;
}
/*
* Don't fire events here.
*/
/*
* We are on the data store, we don't need to do anything here. Commit will push them out.
*/
/*
* We need to put this into the tx state.
*/
theRegion.syncBulkOp(new Runnable() {
@Override
public void run() {
// final boolean requiresRegionContext =
// theRegion.keyRequiresRegionContext();
InternalDistributedMember myId =
theRegion.getDistributionManager().getDistributionManagerId();
for (int i = 0; i < putallOp.putAllDataSize; ++i) {
@Released
EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion, myId, myId, i,
putallOp.putAllData, false, putallOp.getBaseEvent().getContext(), false,
!putallOp.getBaseEvent().isGenerateCallbacks());
try {
// ev.setPutAllOperation(putallOp);
// below if condition returns true on secondary when TXState is
// updated in preCommit only on secondary
// In this case disable the primary check by calling
// distKeyInfo.setCheckPrimary(false);
if (isUpdatingTxStateDuringPreCommit()) {
KeyInfo keyInfo = ev.getKeyInfo();
DistTxKeyInfo distKeyInfo = new DistTxKeyInfo(keyInfo);
distKeyInfo.setCheckPrimary(false);
ev.setKeyInfo(distKeyInfo);
}
/*
* Whenever commit is called, especially when its a DistTxStateOnCoordinator the txState
* is set to null in @see TXManagerImpl.commit() and thus when @see LocalRegion.basicPut
* will be called as in this function, they will not found a TxState with call for
* getDataView()
*/
if (!(theRegion.getDataView() instanceof TXStateInterface)) {
if (putEntry(ev, false, false, null, false, 0L, false)) {
successfulPuts.addKeyAndVersion(putallOp.putAllData[i].key, null);
}
} else if (theRegion.basicPut(ev, false, false, null, false)) {
successfulPuts.addKeyAndVersion(putallOp.putAllData[i].key, null);
}
} finally {
ev.release();
}
}
}
}, putallOp.getBaseEvent().getEventId());
}
@Override
public void postRemoveAll(final DistributedRemoveAllOperation op,
final VersionedObjectList successfulOps, InternalRegion reg) {
final InternalRegion theRegion;
if (reg instanceof BucketRegion) {
theRegion = ((BucketRegion) reg).getPartitionedRegion();
} else {
theRegion = reg;
}
/*
* Don't fire events here. We are on the data store, we don't need to do anything here. Commit
* will push them out. We need to put this into the tx state.
*/
theRegion.syncBulkOp(new Runnable() {
@Override
public void run() {
InternalDistributedMember myId =
theRegion.getDistributionManager().getDistributionManagerId();
for (int i = 0; i < op.removeAllDataSize; ++i) {
@Released
EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(theRegion, myId, myId, i,
op.removeAllData, false, op.getBaseEvent().getContext(), false,
!op.getBaseEvent().isGenerateCallbacks());
try {
ev.setRemoveAllOperation(op);
// below if condition returns true on secondary when TXState is
// updated in preCommit only on secondary
// In this case disable the primary check by calling
// distKeyInfo.setCheckPrimary(false);
if (isUpdatingTxStateDuringPreCommit()) {
KeyInfo keyInfo = ev.getKeyInfo();
DistTxKeyInfo distKeyInfo = new DistTxKeyInfo(keyInfo);
distKeyInfo.setCheckPrimary(false);
ev.setKeyInfo(distKeyInfo);
}
/*
* Whenever commit is called, especially when its a DistTxStateOnCoordinator the txState
* is set to null in @see TXManagerImpl.commit() and thus when basicDestroy will be
* called will be called as in i.e. @see LocalRegion.basicDestroy, they will not found a
* TxState with call for getDataView()
*
* [DISTTX] TODO verify if this is correct to call destroyExistingEntry directly?
*/
try {
if (!(theRegion.getDataView() instanceof TXStateInterface)) {
destroyExistingEntry(ev, true/* should we invoke cacheWriter? */, null);
} else {
theRegion.basicDestroy(ev, true/* should we invoke cacheWriter? */, null);
}
} catch (EntryNotFoundException ignore) {
}
successfulOps.addKeyAndVersion(op.removeAllData[i].key, null);
} finally {
ev.release();
}
}
}
}, op.getBaseEvent().getEventId());
}
@Override
public boolean isDistTx() {
return true;
}
/*
* Populate list of entry states for each region while replying precommit
*/
public boolean populateDistTxEntryStateList(
TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap) {
for (Map.Entry<InternalRegion, TXRegionState> me : this.regions.entrySet()) {
InternalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
String regionFullPath = r.getFullPath();
if (!txrs.isCreatedDuringCommit()) {
ArrayList<DistTxThinEntryState> entryStateList = new ArrayList<DistTxThinEntryState>();
boolean returnValue = txrs.populateDistTxEntryStateList(entryStateList);
if (returnValue) {
if (logger.isDebugEnabled()) {
logger.debug("DistTxState.populateDistTxEntryStateList Adding entries " + " with count="
+ entryStateList.size() + " for region " + regionFullPath + " . Added list="
+ entryStateList);
}
entryStateSortedMap.put(regionFullPath, entryStateList);
} else {
if (logger.isDebugEnabled()) {
logger.debug("DistTxState.populateDistTxEntryStateList Got exception for region "
+ regionFullPath);
}
return false;
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("DistTxState.populateDistTxEntryStateList Not adding entries for region "
+ regionFullPath);
}
}
}
return true;
}
/*
* Set list of entry states for each region while applying commit
*/
public void setDistTxEntryStates(ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
TreeMap<String, TXRegionState> regionSortedMap = new TreeMap<>();
for (TXRegionState txrs : this.regions.values()) {
if (txrs.isCreatedDuringCommit()) {
regionSortedMap.put(txrs.getRegion().getFullPath(), txrs);
}
}
int index = 0;
for (Entry<String, TXRegionState> me : regionSortedMap.entrySet()) {
String regionFullPath = me.getKey();
TXRegionState txrs = me.getValue();
ArrayList<DistTxThinEntryState> entryEvents = entryEventList.get(index++);
if (logger.isDebugEnabled()) {
logger.debug("DistTxState.setDistTxEntryStates For region=" + regionFullPath + " ,index="
+ index + " ,entryEvents=(" + entryEvents.size() + ")=" + entryEvents
+ " ,regionSortedMap=" + regionSortedMap.keySet());
}
txrs.setDistTxEntryStates(entryEvents);
}
}
}