blob: 714386efcc190a5e57be15f804b0a9b2f626066f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXEntryState.DistTxThinEntryState;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* TXRegionState is the entity that tracks all the changes a transaction has made to a region.
*
*
* @since GemFire 4.0
*
* @see TXManagerImpl
*/
public class TXRegionState {
private static final Logger logger = LogService.getLogger();
// A map of Objects (entry keys) -> TXEntryState
private final HashMap<Object, TXEntryState> entryMods;
// A map of Objects (entry keys) -> TXEntryUserAttrState
private HashMap uaMods;
private Set<InternalDistributedMember> otherMembers = null;
private TXState txState;
private InternalRegion region;
private final boolean needsRefCounts;
private boolean cleanedUp;
/*
* For Distributed Tx Created during precommit, to apply changes on secondaries/replicates from
* coordinator.
*/
private boolean createdDuringCommit;
public TXRegionState(InternalRegion r, TXState txState) {
if (r.getPersistBackup() && !r.isMetaRegionWithTransactions()
&& !TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS) {
throw new UnsupportedOperationException(
"Operations on persist-backup regions are not allowed because this thread has an active transaction");
}
if (r.getScope().isGlobal()) {
throw new UnsupportedOperationException(
"Operations on global regions are not allowed because this thread has an active transaction");
}
this.entryMods = new HashMap<Object, TXEntryState>();
this.uaMods = null;
this.region = r;
this.txState = txState;
this.needsRefCounts = r.isEntryEvictionPossible() || r.isEntryExpiryPossible();
r.setInUseByTransaction(true);
}
public InternalRegion getRegion() {
return region;
}
public boolean needsRefCounts() {
return this.needsRefCounts;
}
public Set getEntryKeys() {
return Collections.unmodifiableSet(this.entryMods.keySet());
}
public TXEntryState readEntry(Object entryKey) {
return this.entryMods.get(entryKey);
}
public TXEntryState createReadEntry(LocalRegion r, Object entryKey, RegionEntry re, Object vId,
Object pendingValue) {
InternalCache cache = r.getCache();
boolean isDistributed = false;
if (cache.getTxManager().getTXState() != null) {
isDistributed = cache.getTxManager().getTXState().isDistTx();
} else {
// TXCoordinator and datanode are same
isDistributed = cache.getTxManager().isDistributed();
}
TXEntryState result = cache.getTXEntryStateFactory().createEntry(re, vId, pendingValue,
entryKey, this, isDistributed);
this.entryMods.put(entryKey, result);
return result;
}
// public void rmEntry(Object entryKey, TXState txState, LocalRegion r) {
// rmEntryUserAttr(entryKey);
// TXEntryState e = (TXEntryState)this.entryMods.remove(entryKey);
// if (e != null) {
// e.cleanup(r);
// }
// if (this.uaMods == null && this.entryMods.size() == 0) {
// txState.rmRegion(r);
// }
// }
public TXEntryUserAttrState readEntryUserAttr(Object entryKey) {
TXEntryUserAttrState result = null;
if (this.uaMods != null) {
result = (TXEntryUserAttrState) this.uaMods.get(entryKey);
}
return result;
}
public TXEntryUserAttrState writeEntryUserAttr(Object entryKey, LocalRegion r) {
if (this.uaMods == null) {
this.uaMods = new HashMap();
}
TXEntryUserAttrState result = (TXEntryUserAttrState) this.uaMods.get(entryKey);
if (result == null) {
result = new TXEntryUserAttrState(r.basicGetEntryUserAttribute(entryKey));
this.uaMods.put(entryKey, result);
}
return result;
}
public void rmEntryUserAttr(Object entryKey) {
if (this.uaMods != null) {
if (this.uaMods.remove(entryKey) != null) {
if (this.uaMods.size() == 0) {
this.uaMods = null;
}
}
}
}
/**
* Returns the total number of modifications made by this transaction to this region's entry
* count. The result will have a +1 for every create and a -1 for every destroy.
*/
int entryCountMod() {
int result = 0;
Iterator it = this.entryMods.values().iterator();
while (it.hasNext()) {
TXEntryState es = (TXEntryState) it.next();
result += es.entryCountMod();
}
return result;
}
TXEntryState getTXEntryState(Object key) {
return this.entryMods.get(key);
}
/**
* Fills in a set of any entries created by this transaction for the provided region.
*
* @param ret the HashSet to fill in with key objects
*/
void fillInCreatedEntryKeys(HashSet ret) {
Iterator<Entry<Object, TXEntryState>> it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Entry<Object, TXEntryState> me = it.next();
TXEntryState txes = me.getValue();
if (txes.wasCreatedByTX()) {
ret.add(me.getKey());
}
}
}
/**
* Create a lock request on this region state and adds it to req
*/
void createLockRequest(InternalRegion r, TXLockRequest req) {
if (this.uaMods == null && this.entryMods.isEmpty()) {
return;
}
if (this.txState.logger.isDebugEnabled()) {
this.txState.logger.debug("TXRegionState.createLockRequest 1 " + r.getClass().getSimpleName()
+ " region-state=" + this);
}
if (r.getScope().isDistributed()) {
// [DISTTX] Do not take lock for RR on replicates
if (this.isCreatedDuringCommit()) {
return;
}
DistributedRegion dr = (DistributedRegion) r;
Set<InternalDistributedMember> advice = dr.getCacheDistributionAdvisor().adviseTX();
if (!advice.isEmpty()) {
this.otherMembers = advice; // remember for when it is time to distribute
}
}
if (this.txState.logger.isDebugEnabled()) {
this.txState.logger.debug("TXRegionState.createLockRequest 2");
}
// Bypass D-lock for Pr TX
boolean byPassDLock = false;
if (r instanceof BucketRegion) {
// BucketRegion br = (BucketRegion)r;
// if (br.getRedundancyLevel() < 2) {
byPassDLock = true;
// }
}
final boolean distributedTX = !byPassDLock && r.getScope().isDistributedAck();
if (this.uaMods != null || (!distributedTX && this.entryMods.size() > 0)) {
// need some local locks
TXRegionLockRequestImpl rlr = new TXRegionLockRequestImpl(r.getCache(), r);
if (this.uaMods != null) {
Iterator<Object> it = this.uaMods.keySet().iterator();
while (it.hasNext()) {
// add key with isEvent set to TRUE, for keep BC
rlr.addEntryKey(it.next(), Boolean.TRUE);
}
}
if (!distributedTX && this.entryMods.size() > 0) {
rlr.addEntryKeys(getLockRequestEntryKeys());
}
if (!rlr.isEmpty()) {
req.addLocalRequest(rlr);
}
}
if (distributedTX && this.entryMods.size() > 0) {
// need some distributed locks
TXRegionLockRequestImpl rlr = new TXRegionLockRequestImpl(r.getCache(), r);
rlr.addEntryKeys(getLockRequestEntryKeys());
if (!rlr.isEmpty()) {
req.setOtherMembers(this.otherMembers);
req.addDistributedRequest(rlr);
}
}
}
/**
* Returns a map of entry keys that this tx needs to request a lock for at commit time.
*
* @return <code>null</code> if no entries need to be locked.
*/
private Map getLockRequestEntryKeys() {
HashMap<Object, Boolean> result = null;
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
TXEntryState txes = (TXEntryState) me.getValue();
if (txes.isDirty() && !txes.isOpSearch()) {
if (result == null) {
result = new HashMap();
}
result.put(me.getKey(), txes.isOpAnyEvent(this.region));
}
}
return result;
}
void checkForConflicts(InternalRegion r) throws CommitConflictException {
if (this.isCreatedDuringCommit()) {
return;
}
{
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState) me.getValue();
txes.checkForConflict(r, eKey);
}
}
if (this.uaMods != null) {
r.checkReadiness();
Iterator it = this.uaMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
Object eKey = me.getKey();
TXEntryUserAttrState txes = (TXEntryUserAttrState) me.getValue();
txes.checkForConflict(r, eKey);
}
}
}
/**
* For each entry that is not dirty (all we did was read it) decrement its refcount (so it can be
* evicted as we apply our writes) and remove it from entryMods (so we don't keep iterating over
* it and se we don't try to clean it up again later).
*/
void cleanupNonDirtyEntries(InternalRegion r) {
if (!this.entryMods.isEmpty()) {
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
// Object eKey = me.getKey();
TXEntryState txes = (TXEntryState) me.getValue();
if (txes.cleanupNonDirty(r)) {
it.remove();
}
}
}
}
void buildMessage(InternalRegion r, TXCommitMessage msg) {
try {
if (!r.getScope().isLocal() && !this.entryMods.isEmpty()) {
msg.startRegion(r, entryMods.size());
Iterator it = this.entryMods.entrySet().iterator();
Set<InternalDistributedMember> newMemberSet = new HashSet<InternalDistributedMember>();
if (r.getScope().isDistributed()) {
DistributedRegion dr = (DistributedRegion) r;
msg.addViewVersion(dr, dr.getDistributionAdvisor().startOperation());
newMemberSet.addAll(dr.getCacheDistributionAdvisor().adviseTX());
}
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState) me.getValue();
txes.buildMessage(r, eKey, msg, this.otherMembers);
if (txes.getFilterRoutingInfo() != null) {
newMemberSet.addAll(txes.getFilterRoutingInfo().getMembers());
}
if (txes.getAdjunctRecipients() != null) {
newMemberSet.addAll(txes.getAdjunctRecipients());
}
}
if (!newMemberSet.equals(this.otherMembers)) {
// r.getCache().getLogger().info("DEBUG: participants list has changed! bug 32999.");
// Flag the message that the lock manager needs to be updated with the new member set
msg.setUpdateLockMembers();
this.otherMembers = newMemberSet;
}
msg.finishRegion(this.otherMembers);
}
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
} catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void buildMessageForAdjunctReceivers(InternalRegion r, TXCommitMessage msg) {
try {
if (!r.getScope().isLocal() && !this.entryMods.isEmpty()) {
msg.startRegion(r, entryMods.size());
Iterator it = this.entryMods.entrySet().iterator();
Set<InternalDistributedMember> newMemberSet = new HashSet<InternalDistributedMember>();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState) me.getValue();
txes.buildMessage(r, eKey, msg, this.otherMembers);
if (txes.getFilterRoutingInfo() != null) {
newMemberSet.addAll(txes.getFilterRoutingInfo().getMembers());
}
if (txes.getAdjunctRecipients() != null) {
Set adjunctRecipients = txes.getAdjunctRecipients();
newMemberSet.addAll(adjunctRecipients);
}
}
if (!newMemberSet.equals(this.otherMembers)) {
// r.getCache().getLogger().info("DEBUG: participants list has changed! bug 32999.");
// Flag the message that the lock manager needs to be updated with the new member set
msg.setUpdateLockMembers();
this.otherMembers = newMemberSet;
}
msg.finishRegion(this.otherMembers);
}
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
} catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void buildCompleteMessage(InternalRegion r, TXCommitMessage msg) {
try {
if (!this.entryMods.isEmpty()) {
msg.startRegion(r, entryMods.size());
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState) me.getValue();
txes.buildCompleteMessage(r, eKey, msg, this.otherMembers);
}
msg.finishRegionComplete();
}
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
} catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void applyChangesStart(InternalRegion r, TXStateInterface txState) {
try {
r.txLRUStart();
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
} catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void applyChangesEnd(InternalRegion r, TXStateInterface txState) {
try {
try {
if (this.uaMods != null) {
Iterator it = this.uaMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
Object eKey = me.getKey();
TXEntryUserAttrState txes = (TXEntryUserAttrState) me.getValue();
txes.applyChanges(r, eKey);
}
}
} finally {
r.txLRUEnd();
}
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
} catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
}
void getEvents(InternalRegion r, ArrayList events, TXState txs) {
{
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState) me.getValue();
if (txes.isDirty() && txes.isOpAnyEvent(r)) {
// OFFHEAP: these events are released when TXEvent.release is called
events.add(txes.getEvent(r, eKey, txs));
}
}
}
}
/**
* Put all the entries this region knows about into the given "entries" list as instances of
* TXEntryStateWithRegionAndKey.
*/
void getEntries(ArrayList/* <TXEntryStateWithRegionAndKey> */ entries, InternalRegion r) {
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState) me.getValue();
entries.add(new TXState.TXEntryStateWithRegionAndKey(txes, r, eKey));
}
}
void cleanup(InternalRegion r) {
if (this.cleanedUp)
return;
this.cleanedUp = true;
Iterator it = this.entryMods.values().iterator();
while (it.hasNext()) {
TXEntryState es = (TXEntryState) it.next();
es.cleanup(r);
}
this.region.setInUseByTransaction(false);
}
int getChanges() {
int changes = 0;
Iterator it = this.entryMods.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry) it.next();
TXEntryState txes = (TXEntryState) me.getValue();
if (txes.isDirty()) {
changes++;
}
}
if (this.uaMods != null) {
changes += this.uaMods.size();
}
return changes;
}
public TXState getTXState() {
return txState;
}
public void close() {
for (TXEntryState e : this.entryMods.values()) {
e.close();
}
}
@Override
public String toString() {
StringBuilder str = new StringBuilder();
str.append("{").append(super.toString()).append(" ");
str.append(" ,entryMods=").append(this.entryMods);
str.append(" ,isCreatedDuringCommit=").append(this.isCreatedDuringCommit());
str.append("}");
return str.toString();
}
/**
* @return the createdDuringCommit
*/
public boolean isCreatedDuringCommit() {
return createdDuringCommit;
}
/**
* @param createdDuringCommit the createdDuringCommit to set
*/
public void setCreatedDuringCommit(boolean createdDuringCommit) {
this.createdDuringCommit = createdDuringCommit;
}
public boolean populateDistTxEntryStateList(ArrayList<DistTxThinEntryState> entryStateList) {
String regionFullPath = this.getRegion().getFullPath();
try {
if (!this.entryMods.isEmpty()) {
// [DISTTX] TODO Sort this first
for (Entry<Object, TXEntryState> em : this.entryMods.entrySet()) {
Object mKey = em.getKey();
TXEntryState txes = em.getValue();
DistTxThinEntryState thinEntryState = txes.getDistTxEntryStates();
entryStateList.add(thinEntryState);
if (logger.isDebugEnabled()) {
logger.debug("TXRegionState.populateDistTxEntryStateList Added " + thinEntryState
+ " for key=" + mKey + " ,op=" + txes.opToString() + " ,region=" + regionFullPath);
}
}
}
return true;
} catch (RegionDestroyedException ex) {
// region was destroyed out from under us; after conflict checking
// passed. So act as if the region destroy happened right after the
// commit. We act this way by doing nothing; including distribution
// of this region's commit data.
} catch (CancelException ex) {
// cache was closed out from under us; after conflict checking
// passed. So do nothing.
}
if (logger.isDebugEnabled()) {
logger.debug(
"TXRegionState.populateDistTxEntryStateList Got exception for region " + regionFullPath);
}
return false;
}
public void setDistTxEntryStates(ArrayList<DistTxThinEntryState> entryEventList) {
String regionFullPath = this.getRegion().getFullPath();
int entryModsSize = this.entryMods.size();
int entryEventListSize = entryEventList.size();
if (entryModsSize != entryEventListSize) {
throw new UnsupportedOperationInTransactionException(
String.format("Expected %s during a distributed transaction but got %s",
"entry size of " + entryModsSize + " for region " + regionFullPath,
entryEventListSize));
}
int index = 0;
// [DISTTX] TODO Sort this first
for (Entry<Object, TXEntryState> em : this.entryMods.entrySet()) {
Object mKey = em.getKey();
TXEntryState txes = em.getValue();
DistTxThinEntryState thinEntryState = entryEventList.get(index++);
txes.setDistTxEntryStates(thinEntryState);
if (logger.isDebugEnabled()) {
logger.debug("TxRegionState.setDistTxEntryStates Added " + thinEntryState + " for key="
+ mKey + " ,op=" + txes.opToString() + " ,region=" + regionFullPath);
}
}
}
}