blob: ed80e75d2efb5b32e4250e5105a0e152c62c4ce4 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import java.util.*;
import java.util.Map.Entry;
import org.apache.logging.log4j.Logger;
/** TXRegionState is the entity that tracks all the changes a transaction
* has made to a region.
*
* @author Darrel Schneider
*
* @since 4.0
*
* @see TXManagerImpl
*/
public class TXRegionState {
private static final Logger logger = LogService.getLogger();
// A map of Objects (entry keys) -> TXEntryState
private final HashMap<Object, TXEntryState> entryMods;
// A map of Objects (entry keys) -> TXEntryUserAttrState
private HashMap uaMods;
private Set<InternalDistributedMember> otherMembers = null;
private TXState txState;
private LocalRegion region;
private final boolean needsRefCounts;
private boolean cleanedUp;
/*
* For Distributed Tx
* Created during precommit, to apply changes on secondaries/replicates from coordinator.
*/
private boolean createdDuringCommit;
public TXRegionState(LocalRegion r,TXState txState)
{
if (r.getPersistBackup() && !r.isMetaRegionWithTransactions() && !TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS) {
throw new UnsupportedOperationException(LocalizedStrings.TXRegionState_OPERATIONS_ON_PERSISTBACKUP_REGIONS_ARE_NOT_ALLOWED_BECAUSE_THIS_THREAD_HAS_AN_ACTIVE_TRANSACTION.toLocalizedString());
}
if (r.getScope().isGlobal()) {
throw new UnsupportedOperationException(LocalizedStrings.TXRegionState_OPERATIONS_ON_GLOBAL_REGIONS_ARE_NOT_ALLOWED_BECAUSE_THIS_THREAD_HAS_AN_ACTIVE_TRANSACTION.toLocalizedString());
}
if (r.hasServerProxy()) {
// throw new UnsupportedOperationException(LocalizedStrings.TXRegionState_OPERATIONS_ON_REGION_WITH_CLIENT_POOL_ARE_NOT_ALLOWED_BECAUSE_THIS_THREAD_HAS_AN_ACTIVE_TRANSACTION.toLocalizedString());
}
this.entryMods = new HashMap<Object, TXEntryState>();
this.uaMods = null;
this.region = r;
this.txState = txState;
this.needsRefCounts = r.isEntryEvictionPossible() || r.isEntryExpiryPossible();
r.setInUseByTransaction(true);
}
public LocalRegion getRegion() {
return region;
}
public boolean needsRefCounts() {
return this.needsRefCounts;
}
public Set getEntryKeys() {
return Collections.unmodifiableSet(this.entryMods.keySet());
}
public TXEntryState readEntry(Object entryKey) {
return this.entryMods.get(entryKey);
}
public TXEntryState createReadEntry(LocalRegion r, Object entryKey, RegionEntry re, Object vId, Object pendingValue) {
GemFireCacheImpl cache = r.getCache();
boolean isDistributed = false;
if (cache.getTxManager().getTXState() != null) {
isDistributed = cache.getTxManager().getTXState().isDistTx();
}
else {
// TXCoordinator and datanode are same
isDistributed = cache.getTxManager().isDistributed();
}
TXEntryState result = cache.getTXEntryStateFactory().createEntry(re, vId, pendingValue, entryKey, this, isDistributed);
this.entryMods.put(entryKey, result);
return result;
}
// public void rmEntry(Object entryKey, TXState txState, LocalRegion r) {
// rmEntryUserAttr(entryKey);
// TXEntryState e = (TXEntryState)this.entryMods.remove(entryKey);
// if (e != null) {
// e.cleanup(r);
// }
// if (this.uaMods == null && this.entryMods.size() == 0) {
// txState.rmRegion(r);
// }
// }
public TXEntryUserAttrState readEntryUserAttr(Object entryKey) {
TXEntryUserAttrState result = null;
if (this.uaMods != null) {
result = (TXEntryUserAttrState)this.uaMods.get(entryKey);
}
return result;
}
public TXEntryUserAttrState writeEntryUserAttr(Object entryKey, LocalRegion r) {
if (this.uaMods == null) {
this.uaMods = new HashMap();
}
TXEntryUserAttrState result = (TXEntryUserAttrState)this.uaMods.get(entryKey);
if (result == null) {
result = new TXEntryUserAttrState(r.basicGetEntryUserAttribute(entryKey));
this.uaMods.put(entryKey, result);
}
return result;
}
public void rmEntryUserAttr(Object entryKey) {
if (this.uaMods != null) {
if (this.uaMods.remove(entryKey) != null) {
if (this.uaMods.size() == 0) {
this.uaMods = null;
}
}
}
}
/**
* Returns the total number of modifications made by this transaction
* to this region's entry count. The result will have a +1 for every
* create and a -1 for every destroy.
*/
int entryCountMod() {
int result = 0;
Iterator it = this.entryMods.values().iterator();
while (it.hasNext()) {
TXEntryState es = (TXEntryState)it.next();
result += es.entryCountMod();
}
return result;
}
TXEntryState getTXEntryState(Object key) {
return this.entryMods.get(key);
}
/**
* Fills in a set of any entries created by this transaction for the provided region.
* @param ret the HashSet to fill in with key objects
*/
void fillInCreatedEntryKeys(HashSet ret) {
Iterator<Entry<Object, TXEntryState>> it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Entry<Object, TXEntryState> me = it.next();
TXEntryState txes = me.getValue();
if (txes.wasCreatedByTX()) {
ret.add(me.getKey());
}
}
}
/**
* Create a lock request on this region state and adds it to req
*/
void createLockRequest(LocalRegion r, TXLockRequest req) {
if (this.uaMods == null && this.entryMods.isEmpty()) {
return;
}
if (this.txState.logger.isDebugEnabled()) {
this.txState.logger.debug("TXRegionState.createLockRequest 1 "
+ r.getClass().getSimpleName() + " region-state=" + this);
}
if (r.getScope().isDistributed()) {
// [DISTTX] Do not take lock for RR on replicates
if (this.isCreatedDuringCommit()) {
return;
}
DistributedRegion dr = (DistributedRegion)r;
Set<InternalDistributedMember> advice = dr.getCacheDistributionAdvisor().adviseTX();
if (!advice.isEmpty()) {
this.otherMembers = advice; // remember for when it is time to distribute
}
}
if (this.txState.logger.isDebugEnabled()) {
this.txState.logger.debug("TXRegionState.createLockRequest 2");
}
//Bypass D-lock for Pr TX
boolean byPassDLock = false;
if (r instanceof BucketRegion) {
//BucketRegion br = (BucketRegion)r;
//if (br.getRedundancyLevel() < 2) {
byPassDLock = true;
//}
}
final boolean distributedTX = !byPassDLock && r.getScope().isDistributedAck();
if (this.uaMods != null
|| (!distributedTX && this.entryMods.size() > 0)) {
// need some local locks
TXRegionLockRequestImpl rlr = new TXRegionLockRequestImpl(r);
if (this.uaMods != null) {
rlr.addEntryKeys(this.uaMods.keySet());
}
if (!distributedTX && this.entryMods.size() > 0) {
rlr.addEntryKeys(getLockRequestEntryKeys());
}
if (!rlr.isEmpty()) {
req.addLocalRequest(rlr);
}
}
if (distributedTX && this.entryMods.size() > 0) {
// need some distributed locks
TXRegionLockRequestImpl rlr = new TXRegionLockRequestImpl(r);
rlr.addEntryKeys(getLockRequestEntryKeys());
if (!rlr.isEmpty()) {
req.setOtherMembers(this.otherMembers);
req.addDistributedRequest(rlr);
}
}
}
/**
* Returns a set of entry keys that this tx needs to request
* a lock for at commit time.
* @return <code>null</code> if no entries need to be locked.
*/
private Set getLockRequestEntryKeys() {
HashSet result = null;
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
TXEntryState txes = (TXEntryState)me.getValue();
if (txes.isDirty() && !txes.isOpSearch()) {
if (result == null) {
result = new HashSet();
}
result.add(me.getKey());
}
}
return result;
}
void checkForConflicts(LocalRegion r) throws CommitConflictException {
if (this.isCreatedDuringCommit()) {
return;
}
{
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState)me.getValue();
txes.checkForConflict(r, eKey);
}
}
if (this.uaMods != null) {
r.checkReadiness();
Iterator it = this.uaMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
Object eKey = me.getKey();
TXEntryUserAttrState txes = (TXEntryUserAttrState)me.getValue();
txes.checkForConflict(r, eKey);
}
}
}
/**
* For each entry that is not dirty (all we did was read it)
* decrement its refcount (so it can be evicted as we apply our writes)
* and remove it from entryMods (so we don't keep iterating over it
* and se we don't try to clean it up again later).
*/
void cleanupNonDirtyEntries(LocalRegion r) {
if (!this.entryMods.isEmpty()) {
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
//Object eKey = me.getKey();
TXEntryState txes = (TXEntryState)me.getValue();
if (txes.cleanupNonDirty(r)) {
it.remove();
}
}
}
}
void buildMessage(LocalRegion r, TXCommitMessage msg) {
try {
if (!r.getScope().isLocal() && !this.entryMods.isEmpty()) {
msg.startRegion(r, entryMods.size());
Iterator it = this.entryMods.entrySet().iterator();
Set<InternalDistributedMember> newMemberSet = new HashSet<InternalDistributedMember>();
if (r.getScope().isDistributed()) {
DistributedRegion dr = (DistributedRegion) r;
msg.addViewVersion(dr, dr.getDistributionAdvisor().startOperation());
newMemberSet.addAll(dr.getCacheDistributionAdvisor().adviseTX());
}
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState)me.getValue();
txes.buildMessage(r, eKey, msg,this.otherMembers);
if(txes.getFilterRoutingInfo()!=null) {
newMemberSet.addAll(txes.getFilterRoutingInfo().getMembers());
}
if(txes.getAdjunctRecipients()!=null) {
newMemberSet.addAll(txes.getAdjunctRecipients());
}
}
if (!newMemberSet.equals(this.otherMembers)) {
// r.getCache().getLogger().info("DEBUG: participants list has changed! bug 32999.");
// Flag the message that the lock manager needs to be updated with the new member set
msg.setUpdateLockMembers();
this.otherMembers = newMemberSet;
}
msg.finishRegion(this.otherMembers);
}
}
catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
}
catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void buildCompleteMessage(LocalRegion r, TXCommitMessage msg) {
try {
if (!this.entryMods.isEmpty()) {
msg.startRegion(r, entryMods.size());
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState)me.getValue();
txes.buildCompleteMessage(r, eKey, msg,this.otherMembers);
}
msg.finishRegionComplete();
}
}
catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
}
catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void applyChangesStart(LocalRegion r, TXStateInterface txState) {
try {
r.txLRUStart();
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
}
catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void applyChangesEnd(LocalRegion r, TXStateInterface txState) {
try {
try {
if (this.uaMods != null) {
Iterator it = this.uaMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
Object eKey = me.getKey();
TXEntryUserAttrState txes = (TXEntryUserAttrState)me.getValue();
txes.applyChanges(r, eKey);
}
}
} finally {
r.txLRUEnd();
}
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
}
catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void getEvents(LocalRegion r, ArrayList events, TXState txs) {
{
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState)me.getValue();
if (txes.isDirty() && txes.isOpAnyEvent(r)) {
events.add(txes.getEvent(r, eKey, txs));
}
}
}
}
/**
* Put all the entries this region knows about into the given "entries" list
* as instances of TXEntryStateWithRegionAndKey.
*/
void getEntries(ArrayList/*<TXEntryStateWithRegionAndKey>*/ entries, LocalRegion r) {
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState)me.getValue();
entries.add(new TXState.TXEntryStateWithRegionAndKey(txes, r, eKey));
}
}
void cleanup(LocalRegion r) {
if (this.cleanedUp) return;
this.cleanedUp = true;
Iterator it = this.entryMods.values().iterator();
while (it.hasNext()) {
TXEntryState es = (TXEntryState)it.next();
es.cleanup(r);
}
this.region.setInUseByTransaction(false);
}
int getChanges() {
int changes = 0;
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
TXEntryState txes = (TXEntryState)me.getValue();
if (txes.isDirty()) {
changes++;
}
}
if (this.uaMods != null) {
changes += this.uaMods.size();
}
return changes;
}
public Map<Object,TXEntryState> getEntriesInTxForSqlFabric() {
return Collections.unmodifiableMap(this.entryMods);
}
public TXState getTXState() {
// TODO Auto-generated method stub
return txState;
}
public void close() {
for (TXEntryState e: this.entryMods.values()) {
e.close();
}
}
@Override
public String toString() {
StringBuilder str = new StringBuilder();
str.append("{").append(super.toString()).append(" ");
str.append(" ,entryMods=").append(this.entryMods);
str.append(" ,isCreatedDuringCommit=").append(this.isCreatedDuringCommit());
str.append("}");
return str.toString();
}
/**
* @return the createdDuringCommit
*/
public boolean isCreatedDuringCommit() {
return createdDuringCommit;
}
/**
* @param createdDuringCommit
* the createdDuringCommit to set
*/
public void setCreatedDuringCommit(boolean createdDuringCommit) {
this.createdDuringCommit = createdDuringCommit;
}
public boolean populateDistTxEntryStateList(
ArrayList<DistTxThinEntryState> entryStateList) {
String regionFullPath = this.getRegion().getFullPath();
try {
if (!this.entryMods.isEmpty()) {
// [DISTTX] TODO Sort this first
for (Entry<Object, TXEntryState> em : this.entryMods.entrySet()) {
Object mKey = em.getKey();
TXEntryState txes = em.getValue();
DistTxThinEntryState thinEntryState = txes.getDistTxEntryStates();
entryStateList.add(thinEntryState);
if (logger.isDebugEnabled()) {
logger.debug("TXRegionState.populateDistTxEntryStateList Added "
+ thinEntryState + " for key=" + mKey + " ,op="
+ txes.opToString() + " ,region=" + regionFullPath);
}
}
}
return true;
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
} catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
if (logger.isDebugEnabled()) {
logger
.debug("TXRegionState.populateDistTxEntryStateList Got exception for region "
+ regionFullPath);
}
return false;
}
public void setDistTxEntryStates(
ArrayList<DistTxThinEntryState> entryEventList) {
String regionFullPath = this.getRegion().getFullPath();
int entryModsSize = this.entryMods.size();
int entryEventListSize = entryEventList.size();
if (entryModsSize != entryEventListSize) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
"entry size of " + entryModsSize + " for region "
+ regionFullPath, entryEventListSize));
}
int index = 0;
// [DISTTX] TODO Sort this first
for (Entry<Object, TXEntryState> em : this.entryMods.entrySet()) {
Object mKey = em.getKey();
TXEntryState txes = em.getValue();
DistTxThinEntryState thinEntryState = entryEventList.get(index++);
txes.setDistTxEntryStates(thinEntryState);
if (logger.isDebugEnabled()) {
logger.debug("TxRegionState.setDistTxEntryStates Added "
+ thinEntryState + " for key=" + mKey + " ,op=" + txes.opToString()
+ " ,region=" + regionFullPath);
}
}
}
}