| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.TreeMap; |
| |
| import org.apache.geode.InternalGemFireException; |
| import org.apache.geode.InvalidDeltaException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.internal.MutableForTesting; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.CommitConflictException; |
| import org.apache.geode.cache.DiskAccessException; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.TransactionDataRebalancedException; |
| import org.apache.geode.cache.TransactionWriter; |
| import org.apache.geode.cache.TransactionWriterException; |
| import org.apache.geode.cache.UnsupportedOperationInTransactionException; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.cache.TXEntryState.DistTxThinEntryState; |
| import org.apache.geode.internal.cache.partitioned.PutAllPRMessage; |
| import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage; |
| import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; |
| import org.apache.geode.internal.cache.tx.DistTxEntryEvent; |
| import org.apache.geode.internal.cache.tx.DistTxKeyInfo; |
| import org.apache.geode.internal.cache.versions.RegionVersionVector; |
| import org.apache.geode.internal.offheap.annotations.Released; |
| import org.apache.geode.internal.statistics.StatisticsClock; |
| |
| /** |
| * TxState on a data node VM |
| * |
| * |
| */ |
| public class DistTXState extends TXState { |
| |
| @MutableForTesting |
| public static Runnable internalBeforeApplyChanges; // TODO: cleanup this test hook |
| @MutableForTesting |
| public static Runnable internalBeforeNonTXBasicPut; // TODO: cleanup this test hook |
| |
| private boolean updatingTxStateDuringPreCommit = false; |
| |
| public DistTXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub, |
| StatisticsClock statisticsClock) { |
| super(proxy, onBehalfOfRemoteStub, statisticsClock); |
| } |
| |
| @Override |
| protected void cleanup() { |
| super.cleanup(); |
| // Do nothing for now |
| } |
| |
| /* |
| * If this is a primary member, for each entry in TXState, generate next region version and store |
| * in the entry. |
| */ |
| public void updateRegionVersions() { |
| |
| Iterator<Map.Entry<InternalRegion, TXRegionState>> it = this.regions.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry<InternalRegion, TXRegionState> me = it.next(); |
| InternalRegion r = me.getKey(); |
| TXRegionState txrs = me.getValue(); |
| |
| // Generate next region version only on the primary |
| if (!txrs.isCreatedDuringCommit()) { |
| try { |
| Set entries = txrs.getEntryKeys(); |
| if (!entries.isEmpty()) { |
| Iterator entryIt = entries.iterator(); |
| while (entryIt.hasNext()) { |
| Object key = entryIt.next(); |
| TXEntryState txes = txrs.getTXEntryState(key); |
| RegionVersionVector rvv = r.getVersionVector(); |
| if (rvv != null) { |
| long v = rvv.getNextVersion(); |
| // txes.setNextRegionVersion(v); |
| txes.getDistTxEntryStates().setRegionVersion(v); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Set next region version to " + v + " for region=" + r.getName() |
| + "in TXEntryState for key" + key); |
| } |
| } |
| } |
| } |
| } catch (DiskAccessException dae) { |
| r.handleDiskAccessException(dae); |
| throw dae; |
| } |
| } |
| } |
| } |
| |
| /* |
| * Iterate through all changes and for those changes for which this member hosts a primary bucket, |
| * generate a tail key and store in the TXEntryState. From there it is expected to be carried over |
| * to the secondaries in phase-2 commit. In phase-2 commit, the both the primary and secondaries |
| * should use this tail key to enqueue into parallel queues. |
| */ |
| public void generateTailKeysForParallelDispatcherEvents() { |
| Iterator<Map.Entry<InternalRegion, TXRegionState>> it = this.regions.entrySet().iterator(); |
| |
| while (it.hasNext()) { |
| Map.Entry<InternalRegion, TXRegionState> me = it.next(); |
| InternalRegion r = me.getKey(); |
| TXRegionState txrs = me.getValue(); |
| |
| InternalRegion region = txrs.getRegion(); |
| // Check if it is a bucket region |
| if (region.isUsedForPartitionedRegionBucket()) { |
| // Check if it is a primary bucket |
| BucketRegion bRegion = (BucketRegion) region; |
| if (!(bRegion instanceof AbstractBucketRegionQueue)) { |
| if (bRegion.getBucketAdvisor().isPrimary()) { |
| |
| // Generate a tail key for each entry |
| Set entries = txrs.getEntryKeys(); |
| if (!entries.isEmpty()) { |
| Iterator entryIt = entries.iterator(); |
| while (entryIt.hasNext()) { |
| Object key = entryIt.next(); |
| TXEntryState txes = txrs.getTXEntryState(key); |
| |
| long tailKey = ((BucketRegion) region).generateTailKey(); |
| txes.getDistTxEntryStates().setTailKey(tailKey); |
| } |
| } |
| } // end if primary |
| } |
| } |
| } |
| } |
| |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.cache.TXStateInterface#commit() |
| * |
| * Take Locks Does conflict check on primary ([DISTTX] TODO on primary only) Invoke TxWriter |
| */ |
| @Override |
| public void precommit() |
| throws CommitConflictException, UnsupportedOperationInTransactionException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXState.precommit transaction {} is closed {} ", getTransactionId(), |
| this.closed/* , new Throwable() */); |
| } |
| |
| if (this.closed) { |
| return; |
| } |
| |
| synchronized (this.completionGuard) { |
| this.completionStarted = true; |
| } |
| |
| if (onBehalfOfRemoteStub && !proxy.isCommitOnBehalfOfRemoteStub()) { |
| throw new UnsupportedOperationInTransactionException( |
| "Cannot commit a transaction being run on behalf of a remote thread"); |
| } |
| |
| cleanupNonDirtyRegions(); |
| |
| /* |
| * Lock buckets so they can't be rebalanced then perform the conflict check to fix #43489 |
| */ |
| try { |
| lockBucketRegions(); |
| } catch (PrimaryBucketException pbe) { |
| // not sure what to do here yet |
| RuntimeException re = new TransactionDataRebalancedException( |
| "Transactional data moved, due to rebalancing."); |
| re.initCause(pbe); |
| throw re; |
| } |
| |
| if (this.locks == null) { |
| reserveAndCheck(); |
| } |
| |
| // For internal testing |
| if (this.internalAfterConflictCheck != null) { |
| this.internalAfterConflictCheck.run(); |
| } |
| |
| updateRegionVersions(); |
| |
| generateTailKeysForParallelDispatcherEvents(); |
| |
| /* |
| * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort the |
| * transaction. |
| */ |
| TransactionWriter writer = this.proxy.getTxMgr().getWriter(); |
| if (!firedWriter && writer != null) { |
| try { |
| firedWriter = true; |
| writer.beforeCommit(getEvent()); |
| } catch (TransactionWriterException twe) { |
| cleanup(); |
| throw new CommitConflictException(twe); |
| } catch (VirtualMachineError err) { |
| // cleanup(); this allocates objects so I don't think we can do it - |
| // that leaves the TX open, but we are poison pilling so we should be |
| // ok?? |
| |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| cleanup(); // rollback the transaction! |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| throw new CommitConflictException(t); |
| } |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.cache.TXStateInterface#commit() |
| * |
| * Apply changes release locks |
| */ |
| @Override |
| public void commit() throws CommitConflictException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXState.commit transaction {} is closed {} ", getTransactionId(), |
| this.closed/* , new Throwable() */); |
| } |
| |
| if (this.closed) { |
| return; |
| } |
| |
| try { |
| List/* <TXEntryStateWithRegionAndKey> */ entries = generateEventOffsets(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("commit entries " + entries); |
| } |
| TXCommitMessage msg = null; |
| try { |
| attachFilterProfileInformation(entries); |
| |
| if (internalBeforeApplyChanges != null) { |
| internalBeforeApplyChanges.run(); |
| } |
| |
| // apply changes to the cache |
| applyChanges(entries); |
| |
| // For internal testing |
| if (this.internalAfterApplyChanges != null) { |
| this.internalAfterApplyChanges.run(); |
| } |
| |
| // [DISTTX]TODO: |
| // Build a message specifically for those nodes who |
| // hold gateway senders and listeners but not a copy of the buckets |
| // on which changes in this tx are done. |
| // This is applicable only for partitioned regions and |
| // serial gateway senders. |
| // This works only if the coordinator and sender are not the same node. |
| // For same sender as coordinator, this results in a hang, which needs to be addressed. |
| // If an another method of notifying adjunct receivers is implemented, |
| // the following two lines should be commented out. |
| msg = buildMessageForAdjunctReceivers(); |
| msg.send(this.locks.getDistributedLockId()); |
| |
| // Fire callbacks collected in the local txApply* executions |
| firePendingCallbacks(); |
| |
| this.commitMessage = buildCompleteMessage(); |
| |
| } finally { |
| if (msg != null) { |
| msg.releaseViewVersions(); |
| } |
| this.locks.releaseLocal(); |
| // For internal testing |
| if (this.internalAfterReleaseLocalLocks != null) { |
| this.internalAfterReleaseLocalLocks.run(); |
| } |
| } |
| } finally { |
| cleanup(); |
| } |
| } |
| |
| /** |
| * this builds a new DistTXAdjunctCommitMessage and returns it |
| * |
| * @return the new message |
| */ |
| protected TXCommitMessage buildMessageForAdjunctReceivers() { |
| TXCommitMessage msg = |
| new DistTXAdjunctCommitMessage(this.proxy.getTxId(), this.proxy.getTxMgr().getDM(), this); |
| Iterator<Map.Entry<InternalRegion, TXRegionState>> it = this.regions.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry<InternalRegion, TXRegionState> me = it.next(); |
| InternalRegion r = me.getKey(); |
| TXRegionState txrs = me.getValue(); |
| |
| // only on the primary |
| if (r.isUsedForPartitionedRegionBucket() && !txrs.isCreatedDuringCommit()) { |
| txrs.buildMessageForAdjunctReceivers(r, msg); |
| } |
| } |
| return msg; |
| } |
| |
| |
| @Override |
| public void rollback() { |
| super.rollback(); |
| // Cleanup is called next |
| } |
| |
| protected boolean applyOpsOnRedundantCopy(DistributedMember sender, |
| ArrayList<DistTxEntryEvent> secondaryTransactionalOperations) { |
| boolean returnValue = true; |
| try { |
| boolean result = true; |
| |
| // Start TxState Update During PreCommit phase |
| setUpdatingTxStateDuringPreCommit(true); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXState.applyOpOnRedundantCopy: size of " |
| + "secondaryTransactionalOperations = {}", secondaryTransactionalOperations.size()); |
| } |
| /* |
| * Handle Put Operations meant for secondary. |
| * |
| * @see org.apache.geode.internal.cache.partitioned.PutMessage. |
| * operateOnPartitionedRegion(DistributionManager, PartitionedRegion, long) |
| * |
| * [DISTTX] TODO need to handle other operations |
| */ |
| for (DistTxEntryEvent dtop : secondaryTransactionalOperations) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXState.applyOpOnRedundantCopy: processing dist " + "tx operation {}", |
| dtop); |
| } |
| dtop.setDistributedMember(sender); |
| dtop.setOriginRemote(false); |
| |
| /* |
| * [DISTTX} TODO handle call back argument version tag and other settings in PutMessage |
| */ |
| String failureReason = null; |
| try { |
| if (dtop.getRegion() == null) { |
| // Tx event from the peer. |
| if (dtop.getRegionName() == null) { |
| throw new InternalGemFireException("Region is unavailable on DistTxEntryEvent."); |
| } |
| dtop.setRegion((LocalRegion) getCache().getRegion(dtop.getRegionName())); |
| } |
| |
| if (dtop.getKeyInfo().isDistKeyInfo()) { |
| dtop.getKeyInfo().setCheckPrimary(false); |
| } else { |
| dtop.setKeyInfo(new DistTxKeyInfo(dtop.getKeyInfo())); |
| dtop.getKeyInfo().setCheckPrimary(false); |
| } |
| |
| // apply the op |
| result = applyIndividualOp(dtop); |
| |
| if (!result) { // make sure the region hasn't gone away |
| dtop.getRegion().checkReadiness(); |
| } |
| } catch (CacheWriterException cwe) { |
| result = false; |
| failureReason = "CacheWriterException"; |
| } catch (PrimaryBucketException pbe) { |
| result = false; |
| failureReason = "PrimaryBucketException"; |
| } catch (InvalidDeltaException ide) { |
| result = false; |
| failureReason = "InvalidDeltaException"; |
| } catch (DataLocationException e) { |
| result = false; |
| failureReason = "DataLocationException"; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXState.applyOpOnRedundantCopy {} ##op {}, " + "##region {}, ##key {}", |
| (result ? " sucessfully applied op " : " failed to apply op due to " + failureReason), |
| dtop.getOperation(), dtop.getRegion().getName(), dtop.getKey()); |
| } |
| if (!result) { |
| returnValue = false; |
| break; |
| } |
| } |
| } finally { |
| // End TxState Update During PreCommit phase |
| setUpdatingTxStateDuringPreCommit(false); |
| } |
| return returnValue; |
| } |
| |
| /** |
| * Apply the individual tx op on secondary |
| * |
| * Calls local function such as putEntry instead of putEntryOnRemote as for this |
| * {@link DistTXStateOnCoordinator} as events will always be local. In parent {@link DistTXState} |
| * class will call remote version of functions |
| * |
| */ |
| protected boolean applyIndividualOp(DistTxEntryEvent dtop) throws DataLocationException { |
| boolean result = true; |
| if (dtop.op.isUpdate() || dtop.op.isCreate()) { |
| if (dtop.op.isPutAll()) { |
| assert (dtop.getPutAllOperation() != null); |
| // [DISTTX] TODO what do with versions next? |
| final VersionedObjectList versions = |
| new VersionedObjectList(dtop.getPutAllOperation().putAllDataSize, true, |
| dtop.getRegion().getConcurrencyChecksEnabled()); |
| postPutAll(dtop.getPutAllOperation(), versions, dtop.getRegion()); |
| } else { |
| result = putEntryOnRemote(dtop, false/* ifNew */, false/* ifOld */, |
| null/* expectedOldValue */, false/* requireOldValue */, 0L/* lastModified */, |
| true/* |
| * overwriteDestroyed *not* used |
| */); |
| } |
| } else if (dtop.op.isDestroy()) { |
| if (dtop.op.isRemoveAll()) { |
| assert (dtop.getRemoveAllOperation() != null); |
| // [DISTTX] TODO what do with versions next? |
| final VersionedObjectList versions = |
| new VersionedObjectList(dtop.getRemoveAllOperation().removeAllDataSize, true, |
| dtop.getRegion().getConcurrencyChecksEnabled()); |
| postRemoveAll(dtop.getRemoveAllOperation(), versions, dtop.getRegion()); |
| } else { |
| destroyOnRemote(dtop, false/* TODO [DISTTX] */, null/* |
| * TODO [DISTTX] |
| */); |
| } |
| } else if (dtop.op.isInvalidate()) { |
| invalidateOnRemote(dtop, true/* TODO [DISTTX] */, false/* |
| * TODO [DISTTX] |
| */); |
| } else { |
| logger.debug("DistTXCommitPhaseOneMessage: unsupported TX operation {}", dtop); |
| assert (false); |
| } |
| return result; |
| } |
| |
| |
| public boolean isUpdatingTxStateDuringPreCommit() { |
| return updatingTxStateDuringPreCommit; |
| } |
| |
| /** |
| * For Dist Tx |
| * |
| * @param updatingTxState if updating TxState during Commit Phase |
| */ |
| private void setUpdatingTxStateDuringPreCommit(boolean updatingTxState) |
| throws UnsupportedOperationInTransactionException { |
| this.updatingTxStateDuringPreCommit = updatingTxState; |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXState setUpdatingTxStateDuringPreCommit incoming {} final {} ", |
| updatingTxState, this.updatingTxStateDuringPreCommit); |
| } |
| } |
| |
| @Override |
| public TXRegionState writeRegion(InternalRegion r) { |
| TXRegionState result = readRegion(r); |
| if (result == null) { |
| if (r instanceof BucketRegion) { |
| result = new TXBucketRegionState((BucketRegion) r, this); |
| } else { |
| result = new TXRegionState(r, this); |
| } |
| result.setCreatedDuringCommit(this.updatingTxStateDuringPreCommit); |
| this.regions.put(r, result); |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXState writeRegion flag {} new region-state {} ", |
| this.updatingTxStateDuringPreCommit, result); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTXState writeRegion flag {} region-state {} ", |
| this.updatingTxStateDuringPreCommit, result); |
| } |
| } |
| |
| return result; |
| } |
| |
| |
| /* |
| * [DISTTX] Note: This has been overridden here to associate DistKeyInfo with event to disable |
| * primary check(see DistKeyInfo.setCheckPrimary(false)) when this gets called on secondary of a |
| * PR |
| * |
| * For TX this needs to be a PR passed in as region |
| * |
| * |
| * @see org.apache.geode.internal.cache.InternalDataView#postPutAll(org.apache |
| * .gemfire.internal.cache.DistributedPutAllOperation, java.util.Map, |
| * org.apache.geode.internal.cache.LocalRegion) |
| */ |
| @Override |
| public void postPutAll(final DistributedPutAllOperation putallOp, |
| final VersionedObjectList successfulPuts, InternalRegion reg) { |
| |
| final InternalRegion theRegion; |
| if (reg instanceof BucketRegion) { |
| theRegion = ((BucketRegion) reg).getPartitionedRegion(); |
| } else { |
| theRegion = reg; |
| } |
| /* |
| * Don't fire events here. |
| */ |
| /* |
| * We are on the data store, we don't need to do anything here. Commit will push them out. |
| */ |
| /* |
| * We need to put this into the tx state. |
| */ |
| theRegion.syncBulkOp(new Runnable() { |
| @Override |
| public void run() { |
| // final boolean requiresRegionContext = |
| // theRegion.keyRequiresRegionContext(); |
| InternalDistributedMember myId = |
| theRegion.getDistributionManager().getDistributionManagerId(); |
| for (int i = 0; i < putallOp.putAllDataSize; ++i) { |
| @Released |
| EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion, myId, myId, i, |
| putallOp.putAllData, false, putallOp.getBaseEvent().getContext(), false, |
| !putallOp.getBaseEvent().isGenerateCallbacks()); |
| try { |
| // ev.setPutAllOperation(putallOp); |
| |
| // below if condition returns true on secondary when TXState is |
| // updated in preCommit only on secondary |
| // In this case disable the primary check by calling |
| // distKeyInfo.setCheckPrimary(false); |
| if (isUpdatingTxStateDuringPreCommit()) { |
| KeyInfo keyInfo = ev.getKeyInfo(); |
| DistTxKeyInfo distKeyInfo = new DistTxKeyInfo(keyInfo); |
| distKeyInfo.setCheckPrimary(false); |
| ev.setKeyInfo(distKeyInfo); |
| } |
| /* |
| * Whenever commit is called, especially when its a DistTxStateOnCoordinator the txState |
| * is set to null in @see TXManagerImpl.commit() and thus when @see LocalRegion.basicPut |
| * will be called as in this function, they will not found a TxState with call for |
| * getDataView() |
| */ |
| if (!(theRegion.getDataView() instanceof TXStateInterface)) { |
| if (putEntry(ev, false, false, null, false, 0L, false)) { |
| successfulPuts.addKeyAndVersion(putallOp.putAllData[i].key, null); |
| } |
| } else if (theRegion.basicPut(ev, false, false, null, false)) { |
| successfulPuts.addKeyAndVersion(putallOp.putAllData[i].key, null); |
| } |
| } finally { |
| ev.release(); |
| } |
| } |
| } |
| }, putallOp.getBaseEvent().getEventId()); |
| |
| } |
| |
| @Override |
| public void postRemoveAll(final DistributedRemoveAllOperation op, |
| final VersionedObjectList successfulOps, InternalRegion reg) { |
| final InternalRegion theRegion; |
| if (reg instanceof BucketRegion) { |
| theRegion = ((BucketRegion) reg).getPartitionedRegion(); |
| } else { |
| theRegion = reg; |
| } |
| /* |
| * Don't fire events here. We are on the data store, we don't need to do anything here. Commit |
| * will push them out. We need to put this into the tx state. |
| */ |
| theRegion.syncBulkOp(new Runnable() { |
| @Override |
| public void run() { |
| InternalDistributedMember myId = |
| theRegion.getDistributionManager().getDistributionManagerId(); |
| for (int i = 0; i < op.removeAllDataSize; ++i) { |
| @Released |
| EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(theRegion, myId, myId, i, |
| op.removeAllData, false, op.getBaseEvent().getContext(), false, |
| !op.getBaseEvent().isGenerateCallbacks()); |
| try { |
| ev.setRemoveAllOperation(op); |
| // below if condition returns true on secondary when TXState is |
| // updated in preCommit only on secondary |
| // In this case disable the primary check by calling |
| // distKeyInfo.setCheckPrimary(false); |
| if (isUpdatingTxStateDuringPreCommit()) { |
| KeyInfo keyInfo = ev.getKeyInfo(); |
| DistTxKeyInfo distKeyInfo = new DistTxKeyInfo(keyInfo); |
| distKeyInfo.setCheckPrimary(false); |
| ev.setKeyInfo(distKeyInfo); |
| } |
| /* |
| * Whenever commit is called, especially when its a DistTxStateOnCoordinator the txState |
| * is set to null in @see TXManagerImpl.commit() and thus when basicDestroy will be |
| * called will be called as in i.e. @see LocalRegion.basicDestroy, they will not found a |
| * TxState with call for getDataView() |
| * |
| * [DISTTX] TODO verify if this is correct to call destroyExistingEntry directly? |
| */ |
| try { |
| if (!(theRegion.getDataView() instanceof TXStateInterface)) { |
| destroyExistingEntry(ev, true/* should we invoke cacheWriter? */, null); |
| } else { |
| theRegion.basicDestroy(ev, true/* should we invoke cacheWriter? */, null); |
| } |
| } catch (EntryNotFoundException ignore) { |
| } |
| successfulOps.addKeyAndVersion(op.removeAllData[i].key, null); |
| } finally { |
| ev.release(); |
| } |
| } |
| } |
| }, op.getBaseEvent().getEventId()); |
| |
| } |
| |
| @Override |
| public boolean isDistTx() { |
| return true; |
| } |
| |
| /* |
| * Populate list of entry states for each region while replying precommit |
| */ |
| public boolean populateDistTxEntryStateList( |
| TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap) { |
| for (Map.Entry<InternalRegion, TXRegionState> me : this.regions.entrySet()) { |
| InternalRegion r = me.getKey(); |
| TXRegionState txrs = me.getValue(); |
| String regionFullPath = r.getFullPath(); |
| if (!txrs.isCreatedDuringCommit()) { |
| ArrayList<DistTxThinEntryState> entryStateList = new ArrayList<DistTxThinEntryState>(); |
| boolean returnValue = txrs.populateDistTxEntryStateList(entryStateList); |
| if (returnValue) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTxState.populateDistTxEntryStateList Adding entries " + " with count=" |
| + entryStateList.size() + " for region " + regionFullPath + " . Added list=" |
| + entryStateList); |
| } |
| entryStateSortedMap.put(regionFullPath, entryStateList); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTxState.populateDistTxEntryStateList Got exception for region " |
| + regionFullPath); |
| } |
| return false; |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTxState.populateDistTxEntryStateList Not adding entries for region " |
| + regionFullPath); |
| } |
| } |
| } |
| return true; |
| } |
| |
| /* |
| * Set list of entry states for each region while applying commit |
| */ |
| public void setDistTxEntryStates(ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) { |
| TreeMap<String, TXRegionState> regionSortedMap = new TreeMap<>(); |
| for (TXRegionState txrs : this.regions.values()) { |
| if (txrs.isCreatedDuringCommit()) { |
| regionSortedMap.put(txrs.getRegion().getFullPath(), txrs); |
| } |
| } |
| |
| int index = 0; |
| for (Entry<String, TXRegionState> me : regionSortedMap.entrySet()) { |
| String regionFullPath = me.getKey(); |
| TXRegionState txrs = me.getValue(); |
| ArrayList<DistTxThinEntryState> entryEvents = entryEventList.get(index++); |
| if (logger.isDebugEnabled()) { |
| logger.debug("DistTxState.setDistTxEntryStates For region=" + regionFullPath + " ,index=" |
| + index + " ,entryEvents=(" + entryEvents.size() + ")=" + entryEvents |
| + " ,regionSortedMap=" + regionSortedMap.keySet()); |
| } |
| txrs.setDistTxEntryStates(entryEvents); |
| } |
| } |
| } |