blob: 2672323cc89c8266df943de4dc444984c66ca3af [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.Status;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CommitConflictException;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Region.Entry;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.cache.TransactionWriter;
import com.gemstone.gemfire.cache.TransactionWriterException;
import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
import com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess;
import com.gemstone.gemfire.distributed.TXManagerCancelledException;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/** TXState is the entity that tracks the transaction state on a per
* thread basis, noting changes to Region entries on a per operation
* basis. It lives on the node where transaction data exists.
*
* @author Mitch Thomas
*
* @since 4.0
*
* @see TXManagerImpl
*/
public class TXState implements TXStateInterface {
protected static final Logger logger = LogService.getLogger();
// The nano-timestamp of when the transaction began
private final long beginTime;
// A map of transaction state by Region
final IdentityHashMap<LocalRegion, TXRegionState> regions;
/** whether completion has been started */
protected boolean completionStarted;
/** whether the transaction has been completed and cleaned up */
protected boolean closed = false;
/** guards the completionStarted boolean and the closed boolean */
protected final Object completionGuard = new Object();
protected TXLockRequest locks = null;
// Used for jta commit lifetime
private long jtaLifeTime;
/**
* Used to hand out modification serial numbers used to preserve
* the order of operation done by this transaction.
*/
private int modSerialNum;
private final List<EntryEventImpl> pendingCallbacks =
new ArrayList<EntryEventImpl>();
// Internal testing hooks
private Runnable internalAfterReservation;
protected Runnable internalAfterConflictCheck;
protected Runnable internalAfterApplyChanges;
protected Runnable internalAfterReleaseLocalLocks;
Runnable internalDuringIndividualSend; // package scope allows TXCommitMessage use
Runnable internalAfterIndividualSend; // package scope allows TXCommitMessage use
Runnable internalDuringIndividualCommitProcess; // package scope allows TXCommitMessage use
Runnable internalAfterIndividualCommitProcess; // package scope allows TXCommitMessage use
protected Runnable internalAfterSend;
protected Runnable internalBeforeSend;
/**
* Used to generate eventIDs
*/
private byte[] baseMembershipId;
/**
* Used to generate eventIDs
*/
private long baseThreadId;
/**
* Used to generate eventIDs
*/
private long baseSequenceId;
protected final TXStateProxy proxy;
protected boolean firedWriter = false;
protected final boolean onBehalfOfRemoteStub;
protected boolean gotBucketLocks = false;
protected TXCommitMessage commitMessage = null;
ClientProxyMembershipID bridgeContext = null;
/** keeps track of events, so as not to re-apply events*/
protected Set<EventID> seenEvents = new HashSet<EventID>();
/** keeps track of results of txPutEntry */
private Map<EventID, Boolean> seenResults = new HashMap<EventID, Boolean>();
static final TXEntryState ENTRY_EXISTS = new TXEntryState();
public TXState(TXStateProxy proxy,boolean onBehalfOfRemoteStub)
{
this.beginTime = CachePerfStats.getStatTime();
this.regions = new IdentityHashMap<LocalRegion, TXRegionState>();
this.internalAfterConflictCheck = null;
this.internalAfterApplyChanges = null;
this.internalAfterReleaseLocalLocks = null;
this.internalDuringIndividualSend = null;
this.internalAfterIndividualSend = null;
this.internalBeforeSend = null;
this.internalAfterSend = null;
this.proxy = proxy;
this.onBehalfOfRemoteStub = onBehalfOfRemoteStub;
}
private boolean hasSeenEvent(EntryEventImpl event) {
assert event != null;
if (event.getEventId() == null) {
return false;
}
return this.seenEvents.contains(event.getEventId());
}
private void recordEvent(EntryEventImpl event) {
assert event != null;
if (event.getEventId() != null) {
this.seenEvents.add(event.getEventId());
}
}
private void recordEventAndResult(EntryEventImpl event, boolean result) {
recordEvent(event);
if (event.getEventId() != null) {
this.seenResults.put(event.getEventId(), result);
}
}
private boolean getRecordedResult(EntryEventImpl event) {
assert event != null;
assert this.seenResults.containsKey(event.getEventId());
return this.seenResults.get(event.getEventId());
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(this.getClass())
.append("@")
.append(System.identityHashCode(this))
.append(" onBehalfOfRemoteStub:")
.append(this.onBehalfOfRemoteStub);
return builder.toString();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getTransactionId()
*/
public TransactionId getTransactionId() {
return this.proxy.getTxId();
}
public void firePendingCallbacks() {
for (EntryEventImpl ee: getPendingCallbacks()) {
if(ee.getOperation().isDestroy()) {
ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
} else if(ee.getOperation().isInvalidate()) {
ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
} else if(ee.getOperation().isCreate()) {
ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
} else {
ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
}
}
}
public void freePendingCallbacks() {
for (EntryEventImpl ee: getPendingCallbacks()) {
ee.release();
}
}
public List<EntryEventImpl> getPendingCallbacks() {
return pendingCallbacks;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#readRegion(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public TXRegionState readRegion(LocalRegion r) {
return this.regions.get(r);
}
public void rmRegion(LocalRegion r) {
TXRegionState txr = this.regions.remove(r);
if (txr != null) {
txr.cleanup(r);
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#writeRegion(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public TXRegionState writeRegion(LocalRegion r) {
TXRegionState result = readRegion(r);
if (result == null) {
if (r instanceof BucketRegion) {
result = new TXBucketRegionState((BucketRegion) r,this);
} else {
result = new TXRegionState(r,this);
}
this.regions.put(r, result);
}
if (logger.isDebugEnabled()) {
logger.debug("TXState writeRegion flag {} region-state {} ",
false, result, new Throwable());
}
return result;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getBeginTime()
*/
public long getBeginTime() {
return this.beginTime;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getChanges()
*/
public int getChanges() {
int changes = 0;
Iterator<TXRegionState> it = this.regions.values().iterator();
while (it.hasNext()) {
TXRegionState txrs = it.next();
changes += txrs.getChanges();
}
return changes;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#isInProgress()
*/
public boolean isInProgress() {
return !this.closed;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#nextModSerialNum()
*/
public int nextModSerialNum() {
this.modSerialNum += 1;
return this.modSerialNum;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#needsLargeModCount()
*/
public boolean needsLargeModCount() {
return this.modSerialNum > Byte.MAX_VALUE;
}
protected void reserveAndCheck() throws CommitConflictException {
if (this.closed) {
return;
}
final long conflictStart = CachePerfStats.getStatTime();
this.locks = createLockRequest();
try {
this.locks.obtain();
// for now check account the dlock service time
// later this stat end should be moved to a finally block
if (CachePerfStats.enableClockStats)
this.proxy.getTxMgr().getCachePerfStats().incTxConflictCheckTime(CachePerfStats.getStatTime()-conflictStart);
if (this.internalAfterReservation != null) {
this.internalAfterReservation.run();
}
checkForConflicts();
} catch (CommitConflictException conflict) {
throw conflict;
}
}
byte[] getBaseMembershipId() {
return this.baseMembershipId;
}
long getBaseThreadId() {
return this.baseThreadId;
}
long getBaseSequenceId() {
return this.baseSequenceId;
}
@Override
public void precommit() throws CommitConflictException,
UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
.toLocalizedString("precommit"));
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit()
*/
public void commit() throws CommitConflictException {
if (this.closed) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("committing transaction {}", getTransactionId());
}
synchronized(this.completionGuard) {
this.completionStarted = true;
}
if (onBehalfOfRemoteStub
&& !proxy.isCommitOnBehalfOfRemoteStub()) {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.TXState_CANNOT_COMMIT_REMOTED_TRANSACTION.toLocalizedString());
}
cleanupNonDirtyRegions();
try {
/*
* Lock buckets so they can't be rebalanced
* then perform the conflict check to fix #43489
*/
try {
lockBucketRegions();
} catch(PrimaryBucketException pbe) {
// not sure what to do here yet
RuntimeException re = new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
re.initCause(pbe);
throw re;
}
if (this.locks == null) {
reserveAndCheck();
}
// For internal testing
if (this.internalAfterConflictCheck != null) {
this.internalAfterConflictCheck.run();
}
/*
* If there is a TransactionWriter plugged in,
* we need to to give it an opportunity to
* abort the transaction.
*/
TransactionWriter writer = this.proxy.getTxMgr().getWriter();
if(!firedWriter && writer!=null) {
try {
firedWriter = true;
writer.beforeCommit(getEvent());
} catch(TransactionWriterException twe) {
cleanup();
throw new CommitConflictException(twe);
} catch (VirtualMachineError err) {
//cleanup(); this allocates objects so I don't think we can do it - that leaves the TX open, but we are poison pilling so we should be ok??
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (Throwable t) {
cleanup(); // rollback the transaction!
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
throw new CommitConflictException(t);
}
}
List/*<TXEntryStateWithRegionAndKey>*/ entries = generateEventOffsets();
TXCommitMessage msg = null;
try {
/*
* In order to preserve data consistency,
* we need to:
* 1. Modify the cache first (applyChanges)
* 2. Ask for advice on who to send to (buildMessage)
* 3. Send out to other members.
*
* If this is done out of order, we will have problems with GII, split brain, and HA.
* See bug #41187
* @gregp
*/
attachFilterProfileInformation(entries);
// apply changes to the cache
applyChanges(entries);
// For internal testing
if (this.internalAfterApplyChanges != null) {
this.internalAfterApplyChanges.run();
}
// build and send the message
msg = buildMessage();
this.commitMessage = msg;
if (this.internalBeforeSend != null) {
this.internalBeforeSend.run();
}
msg.send(this.locks.getDistributedLockId());
// For internal testing
if (this.internalAfterSend != null) {
this.internalAfterSend.run();
}
firePendingCallbacks();
/*
* This is to prepare the commit message for the caller, make sure all events are in there.
*/
this.commitMessage = buildCompleteMessage();
} finally {
if (msg != null) {
msg.releaseViewVersions();
}
this.locks.releaseLocal();
// For internal testing
if (this.internalAfterReleaseLocalLocks != null) {
this.internalAfterReleaseLocalLocks.run();
}
}
} finally {
cleanup();
}
}
protected void attachFilterProfileInformation(List entries) {
{
Iterator/*<TXEntryStateWithRegionAndKey>*/ it = entries.iterator();
while (it.hasNext()) {
TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey)it.next();
try {
if(o.r.isUsedForPartitionedRegionBucket()) {
BucketRegion bucket = (BucketRegion)o.r;
/*
* The event must contain the bucket region
*/
EntryEventImpl ev = (EntryEventImpl)o.es.getEvent(o.r, o.key, o.es.getTXRegionState().getTXState());
try {
/*
* The routing information is derived from the PR advisor, not the bucket advisor.
*/
FilterRoutingInfo fri = bucket.getPartitionedRegion().getRegionAdvisor().adviseFilterRouting(ev, Collections.EMPTY_SET);
o.es.setFilterRoutingInfo(fri);
Set set = bucket.getAdjunctReceivers(ev, Collections.EMPTY_SET, new HashSet(), fri);
o.es.setAdjunctRecipients(set);
} finally {
ev.release();
}
}
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
}
catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#rollback()
*/
public void rollback() {
if (this.closed) {
return;
}
synchronized(this.completionGuard) {
this.completionStarted = true;
}
cleanup();
}
/**
* This is a fix for bug #42228 where a client fails over from one server to
* another but gets a conflict on completion because completion had already
* been initiated and had not yet completed
* @return true if a previous completion was in progress
*/
public boolean waitForPreviousCompletion() {
synchronized(this.completionGuard) {// should have already been done, but just to be sure
if (!this.completionStarted) {
return false;
}
while (this.commitMessage == null && !this.closed) {
if (logger.isDebugEnabled()) {
logger.debug("Waiting for previous completion for transaction {}", getTransactionId());
}
try {
this.completionGuard.wait();
} catch (InterruptedException e) {
this.proxy.getCache().getCancelCriterion().checkCancelInProgress(e);
Thread.currentThread().interrupt();
return true;
}
} // while
}
return true;
}
/**
* Generate an event id for each operation that will be done by this tx
* during the application phase of its commit.
* @return a sorted list of TXEntryStateWithRegionAndKey that will be used
* to apply the ops on the nearside in the correct order.
*/
protected List/*<TXEntryStateWithRegionAndKey>*/ generateEventOffsets() {
this.baseMembershipId = EventID.getMembershipId(this.proxy.getTxMgr().getDM().getSystem());
this.baseThreadId = EventID.getThreadId();
this.baseSequenceId = EventID.getSequenceId();
List/*<TXEntryStateWithRegionAndKey>*/ entries = getSortedEntries();
if (logger.isDebugEnabled()) {
logger.debug("generateEventOffsets() entries " + entries
+ " RegionState Map=" + this.regions);
}
Iterator it = entries.iterator();
while (it.hasNext()) {
TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey)it.next();
o.es.generateEventOffsets(this);
}
return entries;
}
private TXLockRequest createLockRequest() {
TXLockRequest result = new TXLockRequest();
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
txrs.createLockRequest(r, result);
}
return result;
}
private void checkForConflicts() throws CommitConflictException,PrimaryBucketException {
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
try {
txrs.checkForConflicts(r);
}catch(DiskAccessException dae) {
r.handleDiskAccessException(dae);
throw dae;
}
}
}
protected void lockBucketRegions() throws PrimaryBucketException {
boolean lockingSucceeded;
do {
lockingSucceeded = true;
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
Set<BucketRegion> obtained = new HashSet<BucketRegion>();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
if (r instanceof BucketRegion && (((BucketRegion)r).getBucketAdvisor().isPrimary())) {
BucketRegion b = (BucketRegion)r;
/*
* Lock the primary bucket so it doesnt get rebalanced until we cleanup!
*/
boolean lockObtained = false;
try {
// use tryLocks to avoid hanging (bug #41708)
boolean locked = b.doLockForPrimary(true);
if (locked) {
obtained.add(b);
lockObtained = true;
} else {
// if we can't get locks then someone has a write-lock. To prevent
// deadlock (see bug #41708) we release locks and re-acquire them
r.getCancelCriterion().checkCancelInProgress(null);
if (logger.isDebugEnabled()) {
logger.debug("tryLock failed for commit on {}. Releasing locks and retrying", r.getFullPath());
}
// release locks and start over
break;
}
} catch(RegionDestroyedException rde) {
if (logger.isDebugEnabled()) {
logger.debug("RegionDestroyedException while locking bucket region {}", r.getFullPath(),rde);
}
throw new TransactionDataRebalancedException("Bucket rebalanced during commit: " + r.getFullPath());
} finally {
if (!lockObtained) {
// fix for bug #41708 - unlock operation-locks already obtained
if (logger.isDebugEnabled()) {
logger.debug("Unexpected exception while locking bucket {}", r.getFullPath());
}
for (BucketRegion br: obtained) {
br.doUnlockForPrimary();
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
lockingSucceeded = false;
}
}
}
}
} while (!lockingSucceeded);
gotBucketLocks = true;
}
protected void cleanupNonDirtyRegions() {
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
txrs.cleanupNonDirtyEntries(r);
}
}
/**
* this builds a new TXCommitMessage and returns it
* @return the new message
*/
protected TXCommitMessage buildMessage() {
TXCommitMessage msg = new TXCommitMessage(this.proxy.getTxId(), this.proxy.getTxMgr().getDM(), this);
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
txrs.buildMessage(r, msg);
}
return msg;
}
/**
* this builds a new TXCommitMessage and returns it
* @return the new message
*/
protected TXCommitMessage buildCompleteMessage() {
TXCommitMessage msg = new TXCommitMessage(this.proxy.getTxId(), this.proxy.getTxMgr().getDM(), this);
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
txrs.buildCompleteMessage(r, msg);
//rcl.add(r);
}
return msg;
}
/**
* applies this transaction to the cache.
*/
protected void applyChanges(List/*<TXEntryStateWithRegionAndKey>*/ entries) {
{
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
txrs.applyChangesStart(r, this);
}
}
{
Iterator/*<TXEntryStateWithRegionAndKey>*/ it = entries.iterator();
while (it.hasNext()) {
TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey)it.next();
try {
o.es.applyChanges(o.r, o.key, this);
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
}
catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
}
{
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
txrs.applyChangesEnd(r, this);
}
}
}
public TXEvent getEvent() {
return new TXEvent(this, getCache());
}
/**
* Note that cleanup does more than is needed in this method.
* This method only needs to do stuff that is required when a
* Cache close is done and we have txs that are still in progress.
* Currently the only thing that is needed is to decrement off-heap
* refcounts since off-heap memory lives after a cache close.
*/
@Override
public void close() {
if (!this.closed) {
this.closed = true;
for (TXRegionState r: this.regions.values()) {
r.close();
}
}
}
protected void cleanup() {
try {
this.closed = true;
this.seenEvents.clear();
this.seenResults.clear();
freePendingCallbacks();
if (this.locks!=null) {
final long conflictStart = CachePerfStats.getStatTime();
this.locks.cleanup();
if (CachePerfStats.enableClockStats)
this.proxy.getTxMgr().getCachePerfStats().incTxConflictCheckTime(CachePerfStats.getStatTime()-conflictStart);
}
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
TXRegionState txrs = me.getValue();
/*
* Need to unlock the primary lock for rebalancing so that rebalancing can resume.
*/
if (gotBucketLocks) {
if (r instanceof BucketRegion && (((BucketRegion)r).getBucketAdvisor().isPrimary())) {
try {
((BucketRegion)r).doUnlockForPrimary();
} catch(RegionDestroyedException rde) {
// ignore
if (logger.isDebugEnabled()) {
logger.debug("RegionDestroyedException while unlocking bucket region {}", r.getFullPath(), rde);
}
} catch(Exception rde) {
// ignore
if (logger.isDebugEnabled()) {
logger.debug("Exception while unlocking bucket region {} this is probably because the bucket was destroyed and never locked initially.",
r.getFullPath(), rde);
}
} finally {
}
}
}
txrs.cleanup(r);
}
} finally {
synchronized(this.completionGuard) {
this.completionGuard.notifyAll();
}
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getEvents()
*/
public List getEvents() {
ArrayList events = new ArrayList();
Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = it.next();
LocalRegion r = (LocalRegion)me.getKey();
TXRegionState txrs = (TXRegionState)me.getValue();
txrs.getEvents(r, events, this);
}
if (events.isEmpty()) {
return Collections.EMPTY_LIST;
} else {
Collections.sort(events);
return Collections.unmodifiableList(events);
}
}
private List/*<TXEntryStateWithRegionAndKey>*/ getSortedEntries() {
ArrayList/*<TXEntryStateWithRegionAndKey>*/ entries = new ArrayList();
Iterator it = this.regions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
LocalRegion r = (LocalRegion)me.getKey();
TXRegionState txrs = (TXRegionState)me.getValue();
txrs.getEntries(entries, r);
}
if (entries.isEmpty()) {
return Collections.EMPTY_LIST;
} else {
Collections.sort(entries);
return entries;
}
}
/**
* Used to keep track of the region and key associated with a TXEntryState.
* Also used to sort the entries into the order in which they will be applied.
* @since 5.7
*/
static class TXEntryStateWithRegionAndKey implements Comparable {
public final TXEntryState es;
public final LocalRegion r;
public final Object key;
public TXEntryStateWithRegionAndKey(TXEntryState es, LocalRegion r, Object key) {
this.es = es;
this.r = r;
this.key = key;
}
private int getSortValue() {
return this.es.getSortValue();
}
public int compareTo(Object o) {
TXEntryStateWithRegionAndKey other = (TXEntryStateWithRegionAndKey)o;
return getSortValue() - other.getSortValue();
}
@Override
public boolean equals(Object o) {
if (o == null || !(o instanceof TXEntryStateWithRegionAndKey)) return false;
return compareTo(o) == 0;
}
@Override
public int hashCode() {
return getSortValue();
}
}
//////////////////////////////////////////////////////////////////
// JTA Synchronization implementation //
//////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#beforeCompletion()
*/
public void beforeCompletion() throws SynchronizationCommitConflictException {
if (this.closed) {
throw new TXManagerCancelledException();
}
this.proxy.getTxMgr().setTXState(null);
final long opStart = CachePerfStats.getStatTime();
this.jtaLifeTime = opStart - getBeginTime();
try {
reserveAndCheck();
/*
* If there is a TransactionWriter plugged in,
* we need to to give it an opportunity to
* abort the transaction.
*/
TransactionWriter writer = this.proxy.getTxMgr().getWriter();
if(writer!=null) {
try {
// need to mark this so we don't fire again in commit
firedWriter = true;
writer.beforeCommit(getEvent());
} catch(TransactionWriterException twe) {
cleanup();
throw new CommitConflictException(twe);
} catch (VirtualMachineError err) {
//cleanup(); this allocates objects so I don't think we can do it - that leaves the TX open, but we are poison pilling so we should be ok??
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (Throwable t) {
cleanup(); // rollback the transaction!
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
throw new CommitConflictException(t);
}
}
} catch (CommitConflictException commitConflict) {
this.proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
throw new SynchronizationCommitConflictException(LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0.toLocalizedString(getTransactionId()), commitConflict);
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#afterCompletion(int)
*/
public void afterCompletion(int status) {
// System.err.println("start afterCompletion");
final long opStart = CachePerfStats.getStatTime();
switch (status) {
case Status.STATUS_COMMITTED:
// System.err.println("begin commit in afterCompletion");
Assert.assertTrue(this.locks!=null,
"Gemfire Transaction afterCompletion called with illegal state.");
try {
this.proxy.getTxMgr().setTXState(null);
commit();
} catch (CommitConflictException error) {
Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId() +
" afterCompletion failed.due to CommitConflictException: " + error);
}
this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
this.locks = null;
// System.err.println("end commit in afterCompletion");
break;
case Status.STATUS_ROLLEDBACK:
this.jtaLifeTime = opStart - getBeginTime();
this.proxy.getTxMgr().setTXState(null);
rollback();
this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
break;
default:
Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
}
// System.err.println("end afterCompletion");
}
/** Add an internal callback which is run after the reservation/lock
* is returned from the Grantor but before the local identity/conflict check.
* This is the first callback to be called during the commit.
*/
public void setAfterReservation(Runnable afterReservation) {
this.internalAfterReservation = afterReservation;
}
/** Add an internal callback which is run after the local identity/conflict check
* has completed but before the changes have been applied to committed state.
*/
public void setAfterConflictCheck(Runnable afterConflictCheck) {
this.internalAfterConflictCheck = afterConflictCheck;
}
/** Add an internal callback which is run after the transaction
* changes have been applied to committed state (locally) but before
* local locks are released (occurs for regions of Local and
* Distributed No Ack scope).
*/
public void setAfterApplyChanges(Runnable afterApplyChanges) {
this.internalAfterApplyChanges = afterApplyChanges;
}
/** Add an internal callback which is run after the the local locks
* are released (which occurs for regions of Local and Distributed
* No Ack scope) but before commit data is sent to recipients aka
* Far Siders (only for Distributed Scope regions).
*/
public void setAfterReleaseLocalLocks(Runnable afterReleaseLocalLocks) {
this.internalAfterReleaseLocalLocks = afterReleaseLocalLocks;
}
/** Add an internal callback which is run once for each recipient
* (aka Far Sider) of commit data, prior to actually sending the
* data. This is called prior to calling <code>setAfterIndividualSend</code>.
*/
public void setDuringIndividualSend(Runnable duringIndividualSend) {
this.internalDuringIndividualSend = duringIndividualSend;
}
/** Add an internal callback which is run once after all the commit
* data has been sent to each recipient but before the "commit
* process" message is sent (only sent in the case there regions
* with Distributed Ack scope)
*/
public void setAfterIndividualSend(Runnable afterIndividualSend) {
this.internalAfterIndividualSend = afterIndividualSend;
}
/** Add an internal callback which is run once for each recipient
* (aka Far Sider) of the "commit process" message (only for
* recipients with Distributed Ack regions), prior to actually
* sending the message.
*/
public void setDuringIndividualCommitProcess(Runnable duringIndividualCommitProcess) {
this.internalDuringIndividualCommitProcess = duringIndividualCommitProcess;
}
/** Add an internal callback which is run once after all the "commit
* process" messages (only for recipients with Distributed Ack
* regions) have been sent but before <code>setAfterSend</code>
* callback has been called.
*/
public void setAfterIndividualCommitProcess(Runnable afterIndividualCommitProcess) {
this.internalAfterIndividualCommitProcess = afterIndividualCommitProcess;
}
/** Add an internal callback which is run after all data has been
* sent (for Distributed scope regions) and any acknowledgements
* have been received (for Distributed Ack scope regions) a but
* before the transaction has been cleaned up.
*/
public void setAfterSend(Runnable afterSend) {
this.internalAfterSend = afterSend;
}
/**
* Add an internal callback which is run after the commit message is
* formed but before it is sent.
*/
public void setBeforeSend(Runnable r) {
this.internalBeforeSend = r;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getCache()
*/
public Cache getCache() {
return this.proxy.getTxMgr().getCache();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getRegions()
*/
public Collection<LocalRegion> getRegions() {
return this.regions.keySet();
}
public TXRegionState txWriteRegion(final LocalRegion localRegion, final KeyInfo entryKey)
{
LocalRegion lr = localRegion.getDataRegionForWrite(entryKey);
return writeRegion(lr);
}
public TXRegionState txReadRegion(LocalRegion localRegion)
{
return readRegion(localRegion);
}
final TXEntryState txWriteEntry(LocalRegion region, EntryEventImpl event,
boolean ifNew, boolean requireOldValue) {
try {
return txWriteEntry(region, event, ifNew, requireOldValue, null);
} catch (EntryNotFoundException e) {
throw new InternalGemFireException("caught unexpected exception", e);
}
}
/**
* @param requireOldValue if true set the old value in the event,
* even if ifNew and entry doesn't
* currently exist (this is needed for putIfAbsent).
* @param ifNew
* only write the entry if it currently does not exist
* @param expectedOldValue the required old value or null
*/
final TXEntryState txWriteEntry(LocalRegion region,
EntryEventImpl event,
boolean ifNew,
boolean requireOldValue,
Object expectedOldValue)
throws EntryNotFoundException {
boolean createIfAbsent = true;
// Asif: If it is a sqlf system & a delta arrives it implies it is update
// which means old value is must so, in this case RememberRead should be
// false ( no point in creating TxEntry)
if (event.hasDelta() && region.getGemFireCache().isSqlfSystem()) {
createIfAbsent = false;
} else if (event.getOperation() == Operation.REPLACE) {
// replace(K,V) and replace(K,V,V) cannot create an entry
createIfAbsent = false;
}
TXEntryState tx = txReadEntry(event.getKeyInfo(), region, true,
expectedOldValue, createIfAbsent);
if (tx != null) {
if (requireOldValue && tx.existsLocally()) {
event.setOldValue(tx.getNearSidePendingValue(), true);
}
boolean existsLocally = tx.existsLocally();
if (!existsLocally && event.getOperation() == Operation.REPLACE) {
throw new EntryNotFoundException("No previously created Entry to be updated");
}
if (existsLocally && ifNew) {
// Since "ifNew" is true then let caller know entry exists
// in tx state or cmt state
return ENTRY_EXISTS;
}
else {
tx.updateForWrite(nextModSerialNum());
}
} else {
if (!createIfAbsent) {
throw new EntryNotFoundException("No previously created Entry to be updated");
}
}
return tx;
}
/**
* this version of txPutEntry takes a ConcurrentMap expectedOldValue parameter.
* If not null, this value must match the current value of the entry or false
* is returned
*/
public boolean txPutEntry(final EntryEventImpl event,
boolean ifNew, boolean requireOldValue,
boolean checkResources, Object expectedOldValue) {
LocalRegion region = event.getRegion();
if (checkResources) {
if (!MemoryThresholds.isLowMemoryExceptionDisabled()) {
region.checkIfAboveThreshold(event);
}
}
if(bridgeContext==null) {
bridgeContext = event.getContext();
}
if (hasSeenEvent(event)) {
return getRecordedResult(event);
}
// if requireOldValue then oldValue gets set in event
// (even if ifNew and entry exists)
// !!!:ezoerner:20080813 need to handle ifOld for transactional on
// PRs when PRs become transactional
TXEntryState tx = null;
boolean result = false;
try {
tx = txWriteEntry(region, event, ifNew, requireOldValue, expectedOldValue);
if (tx == TXState.ENTRY_EXISTS) {
result = false;
}
else {
result = tx.basicPut(event, ifNew, isOriginRemoteForEvents());
}
}
catch (EntryNotFoundException e) {
if (region.getCache().isSqlfSystem()) {
// Asif:throw entry not found exception as sqlfabric is relying on it
// for transactional update on non existent row.
throw e;
}
else {
result = false;
}
} finally {
recordEventAndResult(event, result);
}
return result;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#containsValueForKey(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public boolean containsValueForKey(KeyInfo keyInfo, LocalRegion region) {
TXEntryState tx = txReadEntry(keyInfo, region, true, true/*create txEntry is absent*/);
if (tx != null) {
/**
* Note that we don't consult this.getDataPolicy().isProxy() when
* setting this because in this context we don't want proxies to pretend
* they have a value.
*/
boolean isProxy = false;
return tx.isLocallyValid(isProxy);
} else {
return region.nonTXContainsValueForKey(keyInfo);
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#destroyExistingEntry(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, java.lang.Object)
*/
public void destroyExistingEntry(final EntryEventImpl event, final boolean cacheWrite, Object expectedOldValue) {
if(bridgeContext==null) {
bridgeContext = event.getContext();
}
if (hasSeenEvent(event)) {
return;
}
TXEntryState tx = txWriteExistingEntry(event, expectedOldValue);
final LocalRegion region = event.getRegion();
if (tx.destroy(event, cacheWrite, isOriginRemoteForEvents())) {
Object key = event.getKey();
LocalRegion rr = region.getDataRegionForRead(event.getKeyInfo());
txReadRegion(rr).rmEntryUserAttr(key);
recordEvent(event);
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#invalidateExistingEntry(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
*/
public void invalidateExistingEntry(final EntryEventImpl event, boolean invokeCallbacks, boolean forceNewEntry) {
if(bridgeContext==null) {
bridgeContext = event.getContext();
}
if (hasSeenEvent(event)) {
return;
}
TXEntryState tx = txWriteExistingEntry(event, null);
assert invokeCallbacks && !forceNewEntry;
tx.invalidate(event);
recordEvent(event);
}
/**
* Write an existing entry. This form takes an expectedOldValue which, if not
* null, must be equal to the current value of the entry. If it is not,
* an EntryNotFoundException is thrown.
*
* @param event
* @param expectedOldValue
* @return the tx entry object
* @throws EntryNotFoundException
*/
private TXEntryState txWriteExistingEntry(final EntryEventImpl event, Object expectedOldValue)
throws EntryNotFoundException
{
assert !event.isExpiration();
final Object entryKey = event.getKey();
final LocalRegion region = event.getRegion();
final Operation op = event.getOperation();
TXEntryState tx = txReadEntry(event.getKeyInfo(), region, true,
expectedOldValue, true/*create txEntry is absent*/);
assert tx != null;
if (tx.existsLocally()) {
final boolean invalidatingInvalidEntry = op.isInvalidate()
&& Token.isInvalid(tx.getValueInVM(entryKey));
// Ignore invalidating an invalid entry
if (!invalidatingInvalidEntry) {
tx.updateForWrite(nextModSerialNum());
}
} else if (region.isProxy() && !op.isLocal() && !tx.hasOp()) {
// Distributed operations on proxy regions need to be done
// even if the entry does not exist locally.
// But only if we don't already have a tx operation (once we have an op
// then we honor tx.existsLocally since the tx has storage unlike the proxy).
// We must not throw EntryNotFoundException in this case
tx.updateForWrite(nextModSerialNum());
}
else {
throw new EntryNotFoundException(entryKey.toString());
}
return tx;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getEntry(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public Entry getEntry(final KeyInfo keyInfo, final LocalRegion region, boolean allowTombstones) {
TXEntryState tx = txReadEntry(keyInfo, region, true, true/*create txEntry is absent*/);
if (tx != null && tx.existsLocally()) {
return new TXEntry(region, keyInfo, getProxy());
} else {
return null;
}
}
public Entry accessEntry(KeyInfo keyInfo, LocalRegion localRegion) {
return getEntry(keyInfo, localRegion, false);
}
private TXStateInterface getProxy() {
return this.proxy;
}
/**
* @param keyInfo
* @param localRegion
* @param rememberRead true if the value read from committed state
* needs to be remembered in tx state for repeatable read.
* @param createIfAbsent should a transactional entry be created if not present.
* Used by sql fabric system
* @return a txEntryState or null if the entry doesn't exist in the transaction and/or committed state.
*/
public TXEntryState txReadEntry(KeyInfo keyInfo, LocalRegion localRegion,
boolean rememberRead, boolean createIfAbsent) {
// EntryNotFoundException can be expected in case of sqlfabric and should
// not be caught.
localRegion.cache.getCancelCriterion().checkCancelInProgress(null);
return txReadEntry(keyInfo, localRegion, rememberRead, null, createIfAbsent);
}
/**
* This form of txReadEntry takes a concurrent-map argument, expectedOldValue.
* If this parameter is not null it must match the current value of the entry
* or an EntryNotFoundException is thrown.
*/
protected TXEntryState txReadEntry(KeyInfo keyInfo, LocalRegion localRegion,
boolean rememberRead, Object expectedOldValue, boolean createIfAbsent)
throws EntryNotFoundException
{
LocalRegion dataReg = localRegion.getDataRegionForRead(keyInfo);
TXRegionState txr = txReadRegion(dataReg);
TXEntryState result = null;
if (txr != null) {
result = txr.readEntry(keyInfo.getKey());
}
if (result == null && rememberRead) {
// to support repeatable read create an tx entry that reflects current committed state
if (txr == null) {
txr = txWriteRegion(localRegion, keyInfo);
}
result = localRegion.createReadEntry(txr, keyInfo, createIfAbsent);
}
if (result != null) {
if (expectedOldValue != null) {
Object val = result.getNearSidePendingValue();
if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, val, localRegion)) {
txr.cleanupNonDirtyEntries(localRegion);
throw new EntryNotFoundException(LocalizedStrings.AbstractRegionMap_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE.toLocalizedString());
}
}
} else {
/*
* This means it isn't in the cache and rememberReads = false. This should only happen from test hooks at this point.
*
*/
if (txr!=null) {
txr.cleanupNonDirtyEntries(localRegion);
}
if (expectedOldValue==null) {
/*
* They were expecting non-existence.
*/
return result;
} else {
/*
* If they pass in null to expectedOldValue, we will have it as Token.INVALID here
*/
if(!Token.isInvalid(expectedOldValue)) {
throw new EntryNotFoundException(LocalizedStrings.AbstractRegionMap_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE.toLocalizedString());
}
}
}
return result;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
TXEntryState tx = txReadEntry(keyInfo, localRegion, true, true/*create txEntry is absent*/);
if (tx != null) {
Object v = tx.getValue(keyInfo, localRegion, preferCD);
if(!disableCopyOnRead) {
v = localRegion.conditionalCopy(v);
}
return v;
} else {
return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
}
}
/*
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
*/
@Retained
public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
final Object key = keyInfo.getKey();
TXEntryState tx = txReadEntry(keyInfo, localRegion, true,true/*create txEntry is absent*/);
if (tx != null) {
Object val = tx.getPendingValue();
if(val==null || Token.isInvalidOrRemoved(val)) {
val = findObject(keyInfo,localRegion, val!=Token.INVALID,
true, val, false, false, requestingClient, clientEvent, false, allowReadFromHDFS);
}
return val;
} else {
// rememberRead is always true for now,
// so we should never come here
assert localRegion instanceof PartitionedRegion;
PartitionedRegion pr = (PartitionedRegion)localRegion;
return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, returnTombstones, allowReadFromHDFS);
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#entryCount(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public int entryCount(LocalRegion localRegion) {
int result = localRegion.getRegionSize();
TXRegionState txr = txReadRegion(localRegion);
if (txr != null) {
result += txr.entryCountMod();
}
if(result>0) {
return result;
} else {
// This is to work around bug #40946.
// Other threads can destroy all the keys, and so our entryModCount
// can bring us below 0
return 0;
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#containsKey(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public boolean containsKey(KeyInfo keyInfo, LocalRegion localRegion) {
TXEntryState tx = txReadEntry(keyInfo, localRegion, true,true/*create txEntry is absent*/);
if (tx != null) {
return tx.existsLocally();
} else {
return localRegion.nonTXContainsKey(keyInfo);
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getValueInVM(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
@Retained
public Object getValueInVM(KeyInfo keyInfo, LocalRegion localRegion,
boolean rememberRead) {
TXEntryState tx = txReadEntry(keyInfo, localRegion, rememberRead,true/*create txEntry is absent*/);
if (tx != null) {
return tx.getValueInVM(keyInfo);
}
return localRegion.nonTXbasicGetValueInVM(keyInfo);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#putEntry(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean)
*/
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
validateDelta(event);
return txPutEntry(event, ifNew, requireOldValue, true, expectedOldValue);
}
/**
* throws an exception when cloning is disabled while using delta
* @param event
*/
private void validateDelta(EntryEventImpl event) {
if (event.getDeltaBytes() != null
&& !event.getRegion().getAttributes().getCloningEnabled()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.TXState_DELTA_WITHOUT_CLONING_CANNOT_BE_USED_IN_TX
.toLocalizedString());
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#isStatsDeferred()
*/
public boolean isDeferredStats() {
return true;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
}
private boolean readEntryAndCheckIfDestroyed(KeyInfo keyInfo, LocalRegion localRegion,
boolean rememberReads) {
TXEntryState tx = txReadEntry(keyInfo, localRegion, rememberReads, true/*create txEntry is absent*/);
if (tx != null) {
if (!tx.existsLocally()) {
// It was destroyed by the transaction so skip
// this key and try the next one
return true; // fix for bug 34583
}
}
return false;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getEntryForIterator(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, boolean)
*/
public Object getEntryForIterator(KeyInfo curr, LocalRegion currRgn,
boolean rememberReads, boolean allowTombstones) {
if (currRgn instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion)currRgn;
if (!pr.getBucketPrimary(curr.getBucketId()).equals(pr.cache.getMyId())) {
// to fix bug 47893 suspend the tx before calling nonTXGetEntry
final TXManagerImpl txmgr = pr.getGemFireCache().getTXMgr();
TransactionId tid = txmgr.suspend();
try {
return pr.nonTXGetEntry(curr, false, allowTombstones);
} finally {
txmgr.resume(tid);
}
}
}
if (! readEntryAndCheckIfDestroyed(curr, currRgn, rememberReads)) {
// need to create KeyInfo since higher level iterator may reuse KeyInfo
return new TXEntry(currRgn, new KeyInfo(curr.getKey(),
curr.getCallbackArg(), curr.getBucketId()), proxy, rememberReads);
} else {
return null;
}
}
/*
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getKeyForIterator(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
public Object getKeyForIterator(KeyInfo curr, LocalRegion currRgn,
boolean rememberReads, boolean allowTombstones) {
if (!readEntryAndCheckIfDestroyed(curr, currRgn, rememberReads)) {
return curr.getKey();
} else {
return null;
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getAdditionalKeysForIterator(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public Set getAdditionalKeysForIterator(LocalRegion currRgn) {
if (currRgn instanceof PartitionedRegion) {
final HashSet ret = new HashSet();
for (TXRegionState rs: this.regions.values()) {
if (rs instanceof TXBucketRegionState) {
TXBucketRegionState brs = (TXBucketRegionState)rs;
if (brs.getPartitionedRegion() == currRgn) {
brs.fillInCreatedEntryKeys(ret);
}
}
}
return ret;
} else {
TXRegionState txr = txReadRegion(currRgn);
if (txr != null) {
final HashSet ret = new HashSet();
txr.fillInCreatedEntryKeys(ret);
return ret;
} else {
return null;
}
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#isInProgressAndSameAs(com.gemstone.gemfire.internal.cache.TXStateInterface)
*/
public boolean isInProgressAndSameAs(TXStateInterface otherState) {
return isInProgress() && otherState == this;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#putEntryOnRemote(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean)
*/
public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
boolean ifOld, Object expectedOldValue, boolean requireOldValue,
long lastModified, boolean overwriteDestroyed)
throws DataLocationException {
/*
* Need to flip OriginRemote to true because it is certain that this came from a remote TxStub
*/
event.setOriginRemote(true);
return txPutEntry(event, ifNew, requireOldValue, true, expectedOldValue);
}
public boolean isFireCallbacks() {
return true;
}
public boolean isOriginRemoteForEvents() {
return onBehalfOfRemoteStub || this.proxy.isOnBehalfOfClient();
}
public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) throws DataLocationException {
event.setOriginRemote(true);
destroyExistingEntry(event, cacheWrite, expectedOldValue);
}
public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) throws DataLocationException {
event.setOriginRemote(true);
invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
}
public void checkSupportsRegionDestroy()
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.TXState_REGION_DESTROY_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString());
}
public void checkSupportsRegionInvalidate()
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.TXState_REGION_INVALIDATE_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString());
}
@Override
public void checkSupportsRegionClear()
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.TXState_REGION_CLEAR_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString());
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getBucketKeys(com.gemstone.gemfire.internal.cache.LocalRegion, int)
*/
public Set getBucketKeys(LocalRegion localRegion, int bucketId, boolean allowTombstones) {
PartitionedRegion pr = (PartitionedRegion)localRegion;
return pr.getBucketKeys(bucketId, allowTombstones);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getEntryOnRemote(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public Entry getEntryOnRemote(KeyInfo key, LocalRegion localRegion, boolean allowTombstones)
throws DataLocationException {
PartitionedRegion pr = (PartitionedRegion)localRegion;
Region.Entry txval = getEntry(key, pr, allowTombstones);
if(txval==null) {
throw new EntryNotFoundException(LocalizedStrings.PartitionedRegionDataStore_ENTRY_NOT_FOUND.toLocalizedString());
} else {
NonLocalRegionEntry nlre = new NonLocalRegionEntry(txval, localRegion);
LocalRegion dataReg = localRegion.getDataRegionForRead(key);
return new EntrySnapshot(nlre,dataReg,(LocalRegion)txval.getRegion(), allowTombstones);
}
}
/*
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getSemaphore()
*/
public ReentrantLock getLock() {
return proxy.getLock();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getRegionKeysForIteration(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public Set getRegionKeysForIteration(LocalRegion currRegion) {
return currRegion.getRegionKeysForIteration();
}
public boolean isRealDealLocal()
{
return true;
}
public InternalDistributedMember getOriginatingMember() {
/*
* State will never fwd on to other nodes so this is not relevant
*/
return null;
}
public boolean isMemberIdForwardingRequired() {
/*
* State will never fwd on to other nodes so this is not relevant
*/
return false;
}
public TXCommitMessage getCommitMessage() {
return commitMessage;
}
/*
* For TX this needs to be a PR passed in as region
*
* @see com.gemstone.gemfire.internal.cache.InternalDataView#postPutAll(com.gemstone.gemfire.internal.cache.DistributedPutAllOperation, java.util.Map, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public void postPutAll(final DistributedPutAllOperation putallOp, final VersionedObjectList successfulPuts,LocalRegion reg) {
final LocalRegion theRegion;
if(reg instanceof BucketRegion) {
theRegion = ((BucketRegion)reg).getPartitionedRegion();
} else {
theRegion = reg;
}
/*
* Don't fire events here.
*/
/*
* We are on the data store, we don't need to do anything here. Commit will push them out.
*/
/*
* We need to put this into the tx state.
*/
theRegion.syncBulkOp(new Runnable() {
public void run() {
// final boolean requiresRegionContext = theRegion.keyRequiresRegionContext();
InternalDistributedMember myId = theRegion.getDistributionManager().getDistributionManagerId();
for (int i = 0; i < putallOp.putAllDataSize; ++i) {
EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion, myId,myId, i, putallOp.putAllData, false, putallOp.getBaseEvent().getContext(), false, !putallOp.getBaseEvent().isGenerateCallbacks(), false);
try {
ev.setPutAllOperation(putallOp);
if (theRegion.basicPut(ev, false, false, null, false)) {
successfulPuts.addKeyAndVersion(putallOp.putAllData[i].key, null);
}
} finally {
ev.release();
}
}
}
}, putallOp.getBaseEvent().getEventId());
}
@Override
public void postRemoveAll(final DistributedRemoveAllOperation op, final VersionedObjectList successfulOps, LocalRegion reg) {
final LocalRegion theRegion;
if(reg instanceof BucketRegion) {
theRegion = ((BucketRegion)reg).getPartitionedRegion();
} else {
theRegion = reg;
}
/*
* Don't fire events here.
* We are on the data store, we don't need to do anything here. Commit will push them out.
* We need to put this into the tx state.
*/
theRegion.syncBulkOp(new Runnable() {
public void run() {
InternalDistributedMember myId = theRegion.getDistributionManager().getDistributionManagerId();
for (int i = 0; i < op.removeAllDataSize; ++i) {
EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(theRegion, myId, myId, i, op.removeAllData, false, op.getBaseEvent().getContext(), false, !op.getBaseEvent().isGenerateCallbacks());
ev.setRemoveAllOperation(op);
try {
theRegion.basicDestroy(ev, true/* should we invoke cacheWriter? */, null);
} catch (EntryNotFoundException ignore) {
}
successfulOps.addKeyAndVersion(op.removeAllData[i].key, null);
}
}
}, op.getBaseEvent().getEventId());
}
public void suspend() {
// no special tasks to perform
}
public void resume() {
// no special tasks to perform
}
public void recordTXOperation(ServerRegionDataAccess region, ServerRegionOperation op, Object key, Object arguments[]) {
// no-op here
}
@Override
public void updateEntryVersion(EntryEventImpl event)
throws EntryNotFoundException {
// Do nothing. Not applicable for transactions.
}
@Override
public boolean isTxState() {
return true;
}
@Override
public boolean isTxStateStub() {
return false;
}
@Override
public boolean isTxStateProxy() {
return false;
}
@Override
public boolean isDistTx() {
return false;
}
@Override
public boolean isCreatedOnDistTxCoordinator() {
return false;
}
}