| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.IdentityHashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheClosedException; |
| import org.apache.geode.cache.CacheRuntimeException; |
| import org.apache.geode.cache.CommitDistributionException; |
| import org.apache.geode.cache.CommitIncompleteException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.RegionDistributionException; |
| import org.apache.geode.cache.TransactionId; |
| import org.apache.geode.cache.TransactionListener; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.MembershipListener; |
| import org.apache.geode.distributed.internal.MessageWithReply; |
| import org.apache.geode.distributed.internal.PooledDistributionMessage; |
| import org.apache.geode.distributed.internal.ReliableReplyProcessor21; |
| import org.apache.geode.distributed.internal.ReplyException; |
| import org.apache.geode.distributed.internal.ReplyMessage; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.HeapDataOutputStream; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.cache.LocalRegion.InitializationLevel; |
| import org.apache.geode.internal.cache.locks.TXLockId; |
| import org.apache.geode.internal.cache.locks.TXLockIdImpl; |
| import org.apache.geode.internal.cache.locks.TXLockService; |
| import org.apache.geode.internal.cache.partitioned.Bucket; |
| import org.apache.geode.internal.cache.persistence.PersistentMemberID; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.versions.VersionSource; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.offheap.annotations.Released; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * TXCommitMessage is the message that contains all the information that needs to be distributed, on |
| * commit, to other cache members. |
| * |
| * @since GemFire 4.0 |
| */ |
| public class TXCommitMessage extends PooledDistributionMessage |
| implements MembershipListener, MessageWithReply { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| // Keep a 60 second history @ an estimated 1092 transactions/second ~= 16^4 |
| @MakeNotStatic |
| protected static final TXFarSideCMTracker txTracker = new TXFarSideCMTracker((60 * 1092)); |
| |
| private ArrayList<RegionCommit> regions; // list of RegionCommit instances |
| protected TXId txIdent; |
| protected int processorId; // 0 unless needsAck is true |
| protected TXLockIdImpl lockId; |
| protected HashSet farSiders; |
| protected transient DistributionManager dm; // Used on the sending side of this message |
| private transient int sequenceNum = 0; |
| |
| // Maps receiver Serializables to RegionCommitList instances |
| private transient HashMap<InternalDistributedMember, RegionCommitList> msgMap = null; |
| |
| private transient RegionCommit currentRegion; |
| protected transient TXState txState = null; |
| private transient boolean wasProcessed; |
| private transient boolean isProcessing; |
| private transient boolean dontProcess; |
| private transient boolean departureNoticed = false; |
| private transient boolean lockNeedsUpdate = false; |
| private transient boolean ackRequired = true; |
| /** |
| * List of operations to do when processing this tx. Valid on farside only. |
| */ |
| protected transient ArrayList farSideEntryOps; |
| private byte[] farsideBaseMembershipId; // only available on farside |
| private long farsideBaseThreadId; // only available on farside |
| private long farsideBaseSequenceId; // only available on farside |
| |
| /** |
| * (Nearside) true of any regions in this TX have required roles |
| */ |
| private transient boolean hasReliableRegions = false; |
| |
| /** |
| * Set of all caching exceptions produced while processing this tx |
| */ |
| private transient Set processingExceptions = Collections.emptySet(); |
| |
| private ClientProxyMembershipID bridgeContext = null; |
| |
| /** |
| * Version of the client that this TXCommitMessage is being sent to. Used for backwards |
| * compatibility |
| */ |
| private transient Version clientVersion; |
| |
| /** |
| * A token to be put in TXManagerImpl#failoverMap to represent a CommitConflictException while |
| * committing a transaction |
| */ |
| @Immutable |
| public static final TXCommitMessage CMT_CONFLICT_MSG = new TXCommitMessage(); |
| /** |
| * A token to be put in TXManagerImpl#failoverMap to represent a |
| * TransactionDataNodeHasDepartedException |
| * while committing a transaction |
| */ |
| @Immutable |
| public static final TXCommitMessage REBALANCE_MSG = new TXCommitMessage(); |
| /** |
| * A token to be put in TXManagerImpl#failoverMap to represent an exception while committing a |
| * transaction |
| */ |
| @Immutable |
| public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage(); |
| /** |
| * A token to be put in TXManagerImpl#failoverMap to represent a rolled back transaction |
| */ |
| @Immutable |
| public static final TXCommitMessage ROLLBACK_MSG = new TXCommitMessage(); |
| |
| public TXCommitMessage(TXId txIdent, DistributionManager dm, TXState txState) { |
| this.dm = dm; |
| this.txIdent = txIdent; |
| this.lockId = null; |
| this.regions = null; |
| this.txState = txState; |
| this.wasProcessed = false; |
| this.isProcessing = false; |
| this.dontProcess = false; |
| this.farSiders = null; |
| this.bridgeContext = txState.bridgeContext; |
| } |
| |
| public TXCommitMessage() { |
| // zero arg constructor for DataSerializer |
| } |
| |
| public static TXFarSideCMTracker getTracker() { |
| return TXCommitMessage.txTracker; |
| } |
| |
| /** |
| * Create and return an eventId given its offset. |
| * |
| * @since GemFire 5.7 |
| */ |
| protected EventID getEventId(int eventOffset) { |
| return new EventID(this.farsideBaseMembershipId, this.farsideBaseThreadId, |
| this.farsideBaseSequenceId + eventOffset); |
| } |
| |
| /** |
| * Return the TXCommitMessage we have already received that is associated with id. Note because of |
| * bug 37657 we may need to wait for it to show up. |
| */ |
| public static TXCommitMessage waitForMessage(Object id, DistributionManager dm) { |
| TXFarSideCMTracker map = getTracker(); |
| return map.waitForMessage(id, dm); |
| } |
| |
| void startRegion(InternalRegion r, int maxSize) { |
| this.currentRegion = new RegionCommit(this, r, maxSize); |
| if (r.requiresReliabilityCheck()) { |
| this.hasReliableRegions = true; |
| } |
| } |
| |
| void finishRegion(Set<InternalDistributedMember> s) { |
| // make sure we have some changes and someone to send them to |
| if (!this.currentRegion.isEmpty() && s != null && !s.isEmpty()) { |
| // Get the persistent ids for the current region and save them |
| this.currentRegion.persistentIds = getPersistentIds(this.currentRegion.internalRegion); |
| |
| if (this.msgMap == null) { |
| this.msgMap = new HashMap<>(); |
| } |
| { |
| RegionCommitList newRCL = null; |
| Iterator<InternalDistributedMember> it = s.iterator(); |
| while (it.hasNext()) { |
| InternalDistributedMember recipient = it.next(); |
| |
| if (!this.dm.getDistributionManagerIds().contains(recipient)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Skipping member {} due to dist list absence", recipient); |
| } |
| // skip this member since the dm no longer knows about it |
| continue; |
| } |
| RegionCommitList rcl = this.msgMap.get(recipient); |
| if (rcl == null) { |
| if (newRCL == null) { |
| rcl = new RegionCommitList(); |
| rcl.add(this.currentRegion); |
| newRCL = rcl; |
| } else { |
| rcl = newRCL; |
| } |
| this.msgMap.put(recipient, rcl); |
| } else if (rcl.get(rcl.size() - 1) != this.currentRegion) { |
| rcl.add(this.currentRegion); |
| } |
| } |
| } |
| |
| // Now deal with each existing recipient that does not care |
| // about this region |
| Iterator<Map.Entry<InternalDistributedMember, RegionCommitList>> it = |
| this.msgMap.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry<InternalDistributedMember, RegionCommitList> me = it.next(); |
| if (!s.contains(me.getKey())) { |
| RegionCommitList rcl = me.getValue(); |
| RegionCommitList trimmedRcl = rcl.trim(this.currentRegion); |
| if (trimmedRcl != rcl) { |
| me.setValue(trimmedRcl); |
| } |
| } |
| } |
| } |
| this.currentRegion = null; |
| } |
| |
| private Map<InternalDistributedMember, PersistentMemberID> getPersistentIds(InternalRegion r) { |
| if (r instanceof DistributedRegion) { |
| return ((CacheDistributionAdvisee) r).getCacheDistributionAdvisor().advisePersistentMembers(); |
| } else { |
| return Collections.emptyMap(); |
| } |
| } |
| |
| void finishRegionComplete() { |
| // make sure we have some changes and someone to send them to |
| if (!this.currentRegion.isEmpty()) { |
| { |
| if (this.regions == null) { |
| this.regions = new RegionCommitList(); |
| } |
| this.regions.add(this.currentRegion); |
| } |
| } |
| this.currentRegion = null; |
| } |
| |
| Map viewVersions = new HashMap(); |
| |
| private Boolean needsLargeModCount; |
| |
| private transient boolean disableListeners = false; |
| |
| /** |
| * record CacheDistributionAdvisor.startOperation versions for later cleanup |
| */ |
| protected void addViewVersion(DistributedRegion dr, long version) { |
| viewVersions.put(dr, version); |
| } |
| |
| protected void releaseViewVersions() { |
| RuntimeException rte = null; |
| for (Iterator it = viewVersions.entrySet().iterator(); it.hasNext();) { |
| Map.Entry e = (Map.Entry) it.next(); |
| DistributedRegion dr = (DistributedRegion) e.getKey(); |
| Long viewVersion = (Long) e.getValue(); |
| // need to continue the iteration if one of the regions is destroyed |
| // since others may still be okay |
| try { |
| dr.getDistributionAdvisor().endOperation(viewVersion); |
| } catch (RuntimeException ex) { |
| rte = ex; |
| } |
| } |
| if (rte != null) { |
| throw rte; |
| } |
| } |
| |
| private boolean isEmpty() { |
| return this.msgMap == null || this.msgMap.isEmpty(); |
| } |
| |
| void addOp(InternalRegion r, Object key, TXEntryState entry, Set otherRecipients) { |
| this.currentRegion.addOp(key, entry); |
| } |
| |
| void send(TXLockId lockId) { |
| if (isEmpty()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("empty transaction - nothing to distribute"); |
| } |
| return; |
| } |
| Assert.assertTrue(this.txState != null, "Send must have transaction state."); |
| this.lockId = (TXLockIdImpl) lockId; |
| updateLockMembers(); |
| |
| IdentityHashMap distMap = new IdentityHashMap(); // Map of RegionCommitList keys to Sets of |
| // receivers |
| HashSet ackReceivers = null; |
| { |
| Iterator it = this.msgMap.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry me = (Map.Entry) it.next(); |
| RegionCommitList rcl = (RegionCommitList) me.getValue(); |
| if (rcl.getNeedsAck()) { |
| if (ackReceivers == null) { |
| ackReceivers = new HashSet(); |
| } |
| ackReceivers.add(me.getKey()); |
| } |
| HashSet receivers = (HashSet) distMap.get(rcl); |
| if (receivers == null) { |
| receivers = new HashSet(); |
| distMap.put(rcl, receivers); |
| } |
| receivers.add(me.getKey()); |
| } |
| } |
| |
| CommitReplyProcessor processor = null; |
| { |
| if (ackReceivers != null) { |
| processor = new CommitReplyProcessor(this.dm, ackReceivers, msgMap); |
| if (ackReceivers.size() > 1) { |
| this.farSiders = ackReceivers; |
| } |
| processor.enableSevereAlertProcessing(); |
| } |
| { |
| Iterator it = distMap.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry me = (Map.Entry) it.next(); |
| RegionCommitList rcl = (RegionCommitList) me.getKey(); |
| HashSet recipients = (HashSet) me.getValue(); |
| // now remove from the recipients any recipients that the dm no |
| // longer knows about |
| recipients.retainAll(this.dm.getDistributionManagerIds()); |
| if (!recipients.isEmpty()) { |
| if (this.txState.internalDuringIndividualSend != null) { |
| // Run in test mode, splitting out individual recipients, |
| // so we can control who gets what |
| Iterator indivRecip = recipients.iterator(); |
| while (indivRecip.hasNext()) { |
| this.txState.internalDuringIndividualSend.run(); |
| setRecipientsSendData(Collections.singleton(indivRecip.next()), processor, rcl); |
| } |
| } else { |
| // Run in normal mode sending to multiple recipients in |
| // one shot |
| setRecipientsSendData(recipients, processor, rcl); |
| } |
| } |
| } |
| } |
| if (this.txState.internalAfterIndividualSend != null) { |
| this.txState.internalAfterIndividualSend.run(); |
| } |
| } |
| |
| if (processor != null) { |
| // Send the CommitProcessMessage |
| final CommitProcessMessage cpMsg; |
| if (this.lockId != null) { |
| cpMsg = new CommitProcessForLockIdMessage(this.lockId); |
| } else { |
| cpMsg = new CommitProcessForTXIdMessage(this.txIdent); |
| } |
| if (this.txState.internalDuringIndividualCommitProcess != null) { |
| // Run in test mode |
| Iterator<InternalDistributedMember> indivRecip = ackReceivers.iterator(); |
| while (indivRecip.hasNext()) { |
| this.txState.internalDuringIndividualCommitProcess.run(); |
| cpMsg.setRecipients(Collections.<InternalDistributedMember>singleton(indivRecip.next())); |
| this.dm.putOutgoing(cpMsg); |
| cpMsg.resetRecipients(); |
| } |
| } else { |
| // Run in normal mode |
| cpMsg.setRecipients(ackReceivers); |
| this.dm.putOutgoing(cpMsg); |
| } |
| |
| if (this.txState.internalAfterIndividualCommitProcess != null) { |
| // Testing callback |
| this.txState.internalAfterIndividualCommitProcess.run(); |
| } |
| |
| // for() loop removed for bug 36983 - you can't loop on waitForReplies() |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| processor.waitForCommitCompletion(); |
| this.dm.getStats().incCommitWaits(); |
| } |
| if (this.hasReliableRegions) { |
| checkDistributionReliability(distMap, processor); |
| } |
| } |
| |
| @Override |
| public boolean containsRegionContentChange() { |
| return true; |
| } |
| |
| /** |
| * Checks reliable regions and throws CommitDistributionException if any required roles may not |
| * have received the commit message. |
| * |
| * @param distMap map of RegionCommitList keys to Sets of receivers |
| * @param processor the reply processor |
| * @throws CommitDistributionException if any required roles may not have received the commit |
| * message |
| */ |
| private void checkDistributionReliability(Map distMap, CommitReplyProcessor processor) { |
| // key=RegionCommit, value=Set of recipients |
| Map regionToRecipients = new IdentityHashMap(); |
| |
| // build up the keys in regionToRecipients and add all receivers |
| for (Iterator distIter = distMap.entrySet().iterator(); distIter.hasNext();) { |
| Map.Entry me = (Map.Entry) distIter.next(); |
| RegionCommitList rcl = (RegionCommitList) me.getKey(); |
| Set recipients = (Set) me.getValue(); |
| |
| for (Iterator rclIter = rcl.iterator(); rclIter.hasNext();) { |
| RegionCommit rc = (RegionCommit) rclIter.next(); |
| // skip region if no required roles |
| if (!rc.internalRegion.requiresReliabilityCheck()) { |
| continue; |
| } |
| |
| Set recipientsForRegion = (Set) regionToRecipients.get(rc); |
| if (recipientsForRegion == null) { |
| recipientsForRegion = new HashSet(); |
| regionToRecipients.put(rc, recipientsForRegion); |
| } |
| |
| // get the receiver Set for rcl and perform addAll |
| if (recipients != null) { |
| recipientsForRegion.addAll(recipients); |
| } |
| } |
| } |
| |
| Set cacheClosedMembers = |
| (processor == null) ? Collections.emptySet() : processor.getCacheClosedMembers(); |
| Set departedMembers = |
| (processor == null) ? Collections.emptySet() : processor.getDepartedMembers(); |
| |
| // check reliability on each region |
| Set regionDistributionExceptions = Collections.emptySet(); |
| Set failedRegionNames = Collections.emptySet(); |
| for (Iterator iter = regionToRecipients.entrySet().iterator(); iter.hasNext();) { |
| Map.Entry me = (Map.Entry) iter.next(); |
| final RegionCommit rc = (RegionCommit) me.getKey(); |
| |
| final Set successfulRecipients = new HashSet(msgMap.keySet()); |
| successfulRecipients.removeAll(departedMembers); |
| |
| // remove members who destroyed that region or closed their cache |
| Set regionDestroyedMembers = (processor == null) ? Collections.emptySet() |
| : processor.getRegionDestroyedMembers(rc.internalRegion.getFullPath()); |
| |
| successfulRecipients.removeAll(cacheClosedMembers); |
| successfulRecipients.removeAll(regionDestroyedMembers); |
| |
| try { |
| rc.internalRegion.handleReliableDistribution(successfulRecipients); |
| } catch (RegionDistributionException e) { |
| if (regionDistributionExceptions == Collections.emptySet()) { |
| regionDistributionExceptions = new HashSet(); |
| failedRegionNames = new HashSet(); |
| } |
| regionDistributionExceptions.add(e); |
| failedRegionNames.add(rc.internalRegion.getFullPath()); |
| } |
| } |
| |
| if (!regionDistributionExceptions.isEmpty()) { |
| throw new CommitDistributionException( |
| String.format( |
| "These regions experienced reliability failure during distribution of the operation: %s", |
| failedRegionNames), |
| regionDistributionExceptions); |
| } |
| } |
| |
| /** |
| * Helper method for send |
| */ |
| private void setRecipientsSendData(Set recipients, ReplyProcessor21 processor, |
| RegionCommitList rcl) { |
| setRecipients(recipients); |
| this.regions = rcl; |
| if (rcl.getNeedsAck()) { |
| this.processorId = processor.getProcessorId(); |
| } else { |
| this.processorId = 0; |
| } |
| this.dm.getStats().incSentCommitMessages(1L); |
| this.sequenceNum++; |
| this.dm.putOutgoing(this); |
| resetRecipients(); |
| } |
| |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| this.dm = dm; |
| // Remove this node from the set of recipients |
| if (this.farSiders != null) { |
| this.farSiders.remove(dm.getId()); |
| } |
| |
| if (this.processorId != 0) { |
| TXLockService.createDTLS(this.dm.getSystem()); // fix bug 38843; no-op if already created |
| synchronized (this) { |
| // Handle potential origin departure |
| this.dm.addMembershipListener(this); |
| // Assume ACK mode, defer processing until we receive a |
| // CommitProcess message |
| if (logger.isDebugEnabled()) { |
| final Object key = getTrackerKey(); |
| logger.debug("Adding key:{} class{} to tracker list", key, key.getClass().getName()); |
| } |
| txTracker.add(this); |
| } |
| if (!this.dm.getDistributionManagerIds().contains(getSender())) { |
| memberDeparted(this.dm, getSender(), false /* don't care */); |
| } |
| |
| } else { |
| basicProcess(); |
| } |
| } |
| |
| /** |
| * Adds an entry op for this tx to do on the far side |
| */ |
| void addFarSideEntryOp(RegionCommit.FarSideEntryOp entryOp) { |
| this.farSideEntryOps.add(entryOp); |
| } |
| |
| protected void addProcessingException(Exception e) { |
| // clear all previous exceptions if e is a CacheClosedException |
| if (this.processingExceptions == Collections.emptySet() || e instanceof CancelException) { |
| this.processingExceptions = new HashSet(); |
| } |
| this.processingExceptions.add(e); |
| } |
| |
| public void setDM(DistributionManager dm) { |
| this.dm = dm; |
| } |
| |
| public void basicProcess() { |
| final DistributionManager dm = this.dm; |
| |
| synchronized (this) { |
| if (isProcessing()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("TXCommitMessage {} is already in process, returning", this); |
| } |
| return; |
| } else { |
| setIsProcessing(true); |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("begin processing TXCommitMessage for {}", this.txIdent); |
| } |
| final InitializationLevel oldLevel = |
| LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE); |
| boolean forceListener = false; // this gets flipped if we need to fire tx listener |
| // it needs to default to false because we don't want to fire listeners on pr replicates |
| try { |
| TXRmtEvent txEvent = null; |
| final Cache cache = dm.getExistingCache(); |
| if (cache == null) { |
| addProcessingException(new CacheClosedException()); |
| // return ... this cache is closed so we can't do anything. |
| return; |
| } |
| final TransactionListener[] tls = cache.getCacheTransactionManager().getListeners(); |
| if (tls.length > 0) { |
| txEvent = new TXRmtEvent(this.txIdent, cache); |
| } |
| try { |
| // Pre-process each Region in the tx |
| try { |
| Iterator it = this.regions.iterator(); |
| while (it.hasNext()) { |
| boolean failedBeginProcess = true; |
| RegionCommit rc = (RegionCommit) it.next(); |
| try { |
| failedBeginProcess = !rc.beginProcess(dm, this.txIdent, txEvent); |
| } catch (CacheRuntimeException problem) { |
| processCacheRuntimeException(problem); |
| } finally { |
| if (failedBeginProcess) { |
| rc.internalRegion = null; // Cause related FarSideEntryOps to skip processing |
| it.remove(); // Skip endProcessing as well |
| } |
| } |
| } |
| basicProcessOps(); |
| } finally { // fix for bug 40001 |
| // post-process each Region in the tx |
| Iterator it = this.regions.iterator(); |
| while (it.hasNext()) { |
| try { |
| RegionCommit rc = (RegionCommit) it.next(); |
| rc.endProcess(); |
| if (rc.isForceFireEvent(dm)) { |
| forceListener = true; |
| } |
| } catch (CacheRuntimeException problem) { |
| processCacheRuntimeException(problem); |
| } |
| } |
| } |
| |
| /* |
| * We need to make sure that we should fire a TX afterCommit event. |
| */ |
| boolean internalEvent = (txEvent != null && txEvent.hasOnlyInternalEvents()); |
| if (!disableListeners && !internalEvent |
| && (forceListener || (txEvent != null && !txEvent.isEmpty()))) { |
| for (int i = 0; i < tls.length; i++) { |
| try { |
| tls[i].afterCommit(txEvent); |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.error("Exception occurred in TransactionListener", |
| t); |
| } |
| } |
| } |
| } catch (CancelException e) { |
| processCacheRuntimeException(e); |
| } finally { |
| if (txEvent != null) { |
| txEvent.freeOffHeapResources(); |
| } |
| } |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| if (isAckRequired()) { |
| ack(); |
| } |
| if (!dm.getExistingCache().isClient() && bridgeContext != null) { |
| getTracker().saveTXForClientFailover(txIdent, this); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("completed processing TXCommitMessage for {}", this.txIdent); |
| } |
| } |
| } |
| |
| public void basicProcessOps() { |
| List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size()); |
| Collections.sort(this.farSideEntryOps); |
| Iterator it = this.farSideEntryOps.iterator(); |
| while (it.hasNext()) { |
| try { |
| RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp) it.next(); |
| entryOp.process(pendingCallbacks); |
| } catch (CacheRuntimeException problem) { |
| processCacheRuntimeException(problem); |
| } catch (Exception e) { |
| addProcessingException(e); |
| } |
| } |
| firePendingCallbacks(pendingCallbacks); |
| } |
| |
| private void firePendingCallbacks(List<EntryEventImpl> callbacks) { |
| Iterator<EntryEventImpl> ci = callbacks.iterator(); |
| while (ci.hasNext()) { |
| EntryEventImpl ee = ci.next(); |
| try { |
| if (ee.getOperation().isDestroy()) { |
| ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true); |
| } else if (ee.getOperation().isInvalidate()) { |
| ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true); |
| } else if (ee.getOperation().isCreate()) { |
| ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true); |
| } else { |
| ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true); |
| } |
| } finally { |
| ee.release(); |
| } |
| } |
| } |
| |
| protected void processCacheRuntimeException(CacheRuntimeException problem) { |
| if (problem instanceof RegionDestroyedException) { // catch RegionDestroyedException |
| addProcessingException(problem); |
| } else if (problem instanceof CancelException) { // catch CacheClosedException |
| addProcessingException(problem); |
| throw problem; |
| } else { // catch CacheRuntimeException |
| addProcessingException(problem); |
| logger.error( |
| "Transaction message {} from sender {} failed processing, unknown transaction state: {}", |
| new Object[] {this, getSender(), problem}); |
| } |
| } |
| |
| private void ack() { |
| if (this.processorId != 0) { |
| CommitReplyException replyEx = null; |
| if (!this.processingExceptions.isEmpty()) { |
| replyEx = new CommitReplyException( |
| String.format("Commit operation generated one or more exceptions from %s", |
| this.getSender()), |
| this.processingExceptions); |
| } |
| ReplyMessage.send(getSender(), this.processorId, replyEx, this.dm); |
| } |
| } |
| |
| @Override |
| public int getDSFID() { |
| // on near side send old TX_COMMIT_MESSAGE if there is at least one 7.0 |
| // member in the system, otherwise send the new 7.0.1 message. |
| // 7.0.1 members will be able to deserialize either |
| // if (shouldSend701Message()) { |
| // this.shouldWriteShadowKey = true; |
| // return TX_COMMIT_MESSAGE_701; |
| return TX_COMMIT_MESSAGE; |
| /* |
| * } this.shouldWriteShadowKey = false; return TX_COMMIT_MESSAGE; |
| */ |
| } |
| |
| /* |
| * /** Do not send shadowKey to clients or when there are member(s) older than 7.0.1. |
| * |
| * private boolean shouldSend701Message() { if (this.clientVersion == null && |
| * this.getDM().getMembersWithOlderVersion("7.0.1").isEmpty()) { return true; } return false; } |
| * |
| * public boolean shouldReadShadowKey() { return this.shouldReadShadowKey; } |
| * |
| * public void setShouldReadShadowKey(boolean shouldReadShadowKey) { this.shouldReadShadowKey = |
| * shouldReadShadowKey; } |
| * |
| * public boolean shouldWriteShadowKey() { return this.shouldWriteShadowKey; } |
| */ |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| int pId = in.readInt(); |
| |
| if (isAckRequired()) { |
| this.processorId = pId; |
| ReplyProcessor21.setMessageRPId(this.processorId); |
| } else { |
| this.processorId = -1; |
| } |
| |
| this.txIdent = TXId.createFromData(in); |
| if (in.readBoolean()) { |
| this.lockId = TXLockIdImpl.createFromData(in); |
| } |
| int totalMaxSize = in.readInt(); |
| |
| this.farsideBaseMembershipId = DataSerializer.readByteArray(in); |
| this.farsideBaseThreadId = in.readLong(); |
| this.farsideBaseSequenceId = in.readLong(); |
| |
| this.needsLargeModCount = in.readBoolean(); |
| |
| final boolean hasShadowKeys = hasFlagsField(in) ? in.readBoolean() : useShadowKey(); |
| |
| int regionsSize = in.readInt(); |
| this.regions = new ArrayList(regionsSize); |
| this.farSideEntryOps = new ArrayList(totalMaxSize); |
| for (int i = 0; i < regionsSize; i++) { |
| RegionCommit rc = new RegionCommit(this); |
| try { |
| rc.fromData(in, hasShadowKeys); |
| } catch (CacheClosedException cce) { |
| addProcessingException(cce); |
| // return to avoid serialization error being sent in reply |
| return; |
| } |
| this.regions.add(rc); |
| } |
| |
| this.bridgeContext = ClientProxyMembershipID.readCanonicalized(in); |
| this.farSiders = DataSerializer.readHashSet(in); |
| } |
| |
| /** |
| * Return true if a distributed ack message is required. On the client side of a transaction, this |
| * returns false, while returning true elsewhere. |
| * |
| * @return requires ack message or not |
| */ |
| private boolean isAckRequired() { |
| return this.ackRequired; |
| } |
| |
| |
| /** |
| * Indicate whether an ack is required. Defaults to true. |
| * |
| * @param a true if we require an ack. false if not. false on clients. |
| */ |
| public void setAckRequired(boolean a) { |
| this.ackRequired = a; |
| if (!a) { |
| this.processorId = -1; |
| } |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| out.writeInt(this.processorId); |
| InternalDataSerializer.invokeToData(this.txIdent, out); |
| { |
| boolean hasLockId = this.lockId != null; |
| out.writeBoolean(hasLockId); |
| if (hasLockId) { |
| InternalDataSerializer.invokeToData(this.lockId, out); |
| } |
| } |
| int regionsSize = 0; |
| { |
| int totalMaxSize = 0; |
| if (this.regions != null) { |
| regionsSize = this.regions.size(); |
| for (int i = 0; i < this.regions.size(); i++) { |
| RegionCommit rc = (RegionCommit) this.regions.get(i); |
| totalMaxSize += rc.maxSize; |
| } |
| } |
| out.writeInt(totalMaxSize); |
| } |
| |
| if (this.txState != null) { |
| DataSerializer.writeByteArray(this.txState.getBaseMembershipId(), out); |
| out.writeLong(this.txState.getBaseThreadId()); |
| out.writeLong(this.txState.getBaseSequenceId()); |
| } else { |
| DataSerializer.writeByteArray(this.farsideBaseMembershipId, out); |
| out.writeLong(this.farsideBaseThreadId); |
| out.writeLong(this.farsideBaseSequenceId); |
| } |
| |
| if (this.txState != null) { |
| DataSerializer.writeBoolean(this.txState.needsLargeModCount(), out); |
| } else { |
| DataSerializer.writeBoolean(this.needsLargeModCount, out); |
| } |
| |
| final boolean useShadowKey = useShadowKey(); |
| if (hasFlagsField(out)) { |
| out.writeBoolean(useShadowKey); |
| } |
| |
| out.writeInt(regionsSize); |
| { |
| if (regionsSize > 0) { |
| for (int i = 0; i < this.regions.size(); i++) { |
| RegionCommit rc = (RegionCommit) this.regions.get(i); |
| rc.toData(out, context, useShadowKey); |
| } |
| } |
| } |
| |
| DataSerializer.writeObject(bridgeContext, out); |
| |
| DataSerializer.writeHashSet(this.farSiders, out); |
| } |
| |
| private boolean hasFlagsField(final DataOutput out) { |
| return hasFlagsField(InternalDataSerializer.getVersionForDataStream(out)); |
| } |
| |
| private boolean hasFlagsField(final DataInput in) { |
| return hasFlagsField(InternalDataSerializer.getVersionForDataStream(in)); |
| } |
| |
| private boolean hasFlagsField(final Version version) { |
| return version.compareTo(Version.GEODE_1_7_0) >= 0; |
| } |
| |
| private boolean useShadowKey() { |
| return null == clientVersion; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(256); |
| result.append("TXCommitMessage@").append(System.identityHashCode(this)).append("#") |
| .append(this.sequenceNum).append(" processorId=").append(this.processorId).append(" txId=") |
| .append(this.txIdent); |
| |
| if (this.farSiders != null) { |
| Iterator fs = this.farSiders.iterator(); |
| result.append(" farSiders="); |
| while (fs.hasNext()) { |
| result.append(fs.next()); |
| if (fs.hasNext()) { |
| result.append(' '); |
| } |
| } |
| } else { |
| result.append(" farSiders=<null>"); |
| } |
| if (this.regions != null) { |
| Iterator it = this.regions.iterator(); |
| while (it.hasNext()) { |
| result.append(' ').append(it.next()); |
| } |
| } |
| return result.toString(); |
| } |
| |
| /** |
| * Combines a set of small TXCommitMessages that belong to one transaction into a txCommitMessage |
| * that represents an entire transaction. At commit time the txCommitMessage sent to each node can |
| * be a subset of the transaction, this method will combine those subsets into a complete |
| * message. |
| * |
| * @return the complete txCommitMessage |
| */ |
| public static TXCommitMessage combine(Set<TXCommitMessage> msgSet) { |
| assert msgSet != null; |
| TXCommitMessage firstPart = null; |
| Iterator<TXCommitMessage> it = msgSet.iterator(); |
| while (it.hasNext()) { |
| if (firstPart == null) { |
| firstPart = it.next(); |
| continue; |
| } |
| firstPart.combine(it.next()); |
| } |
| return firstPart; |
| } |
| |
| /** |
| * Combines the other TXCommitMessage into this message. Used to compute complete TXCommitMessage |
| * from parts. |
| */ |
| public void combine(TXCommitMessage other) { |
| assert other != null; |
| Iterator it = other.regions.iterator(); |
| Map<String, RegionCommit> regionCommits = new HashMap<>(); |
| for (RegionCommit commit : regions) { |
| regionCommits.put(commit.getRegionPath(), commit); |
| } |
| for (RegionCommit commit : other.regions) { |
| if (!regionCommits.containsKey(commit.getRegionPath())) { |
| commit.msg = this; |
| this.regions.add(commit); |
| regionCommits.put(commit.getRegionPath(), commit); |
| } |
| } |
| } |
| |
| public static class RegionCommitList extends ArrayList<RegionCommit> { |
| private static final long serialVersionUID = -8910813949027683641L; |
| private transient boolean needsAck = false; |
| private transient RegionCommit trimRC = null; |
| private transient RegionCommitList trimChild = null; |
| |
| public RegionCommitList() { |
| super(); |
| } |
| |
| public RegionCommitList(RegionCommitList c) { |
| super(c); |
| } |
| |
| public boolean getNeedsAck() { |
| return this.needsAck; |
| } |
| |
| @Override // GemStoneAddition |
| public boolean add(RegionCommit o) { |
| RegionCommit rc = (RegionCommit) o; |
| rc.incRefCount(); |
| if (!this.needsAck && rc.needsAck()) { |
| this.needsAck = true; |
| } |
| return super.add(o); |
| } |
| |
| /** |
| * Creates a new list, if needed, that contains all the elements of the specified old list |
| * except the last one if it is 'rc'. Also recomputes needsAck field. |
| */ |
| public RegionCommitList trim(RegionCommit rc) { |
| if (get(size() - 1) != rc) { |
| // no need to trim because it does not contain rc |
| return this; |
| } |
| if (this.trimRC == rc) { |
| return this.trimChild; |
| } |
| RegionCommitList result = new RegionCommitList(this); |
| this.trimRC = rc; |
| this.trimChild = result; |
| result.remove(result.size() - 1); |
| { |
| Iterator it = result.iterator(); |
| while (it.hasNext()) { |
| RegionCommit itrc = (RegionCommit) it.next(); |
| itrc.incRefCount(); |
| if (itrc.needsAck()) { |
| result.needsAck = true; |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(256); |
| result.append('@').append(System.identityHashCode(this)).append(' ').append(super.toString()); |
| return result.toString(); |
| } |
| } |
| |
| public static class RegionCommit { |
| private final TxCallbackEventFactory txCallbackEventFactory = new TxCallbackEventFactoryImpl(); |
| /** |
| * The region that this commit represents. Valid on both nearside and farside. |
| */ |
| protected transient InternalRegion internalRegion; |
| /** |
| * Valid only on farside. |
| */ |
| private String regionPath; |
| private String parentRegionPath; |
| /** |
| * The message this region commit is a part of. Valid on both farside and nearside. |
| */ |
| private transient TXCommitMessage msg; |
| /** |
| * Number of RegionCommitList instances that have this RegionCommit in them Valid only on |
| * nearside. |
| */ |
| private transient int refCount = 0; |
| /** |
| * Valid only on nearside. |
| */ |
| private transient HeapDataOutputStream preserializedBuffer = null; |
| /** |
| * Upperbound on the number of operations this region could possibly have Valid only on |
| * nearside. |
| */ |
| transient int maxSize; |
| /** |
| * A list of Object; each one is the entry key for a distributed operation done by this |
| * transaction. The list must be kept in sync with opKeys. Valid only on nearside. |
| */ |
| private transient ArrayList opKeys; |
| /** |
| * A list of TXEntryState; each one is the entry info for a distributed operation done by this |
| * transaction. The list must be kept in sync with opKeys. Valid only on nearside. |
| */ |
| private transient ArrayList opEntries; |
| |
| private transient VersionSource memberId; |
| |
| /** |
| * The persistent ids of the peers for this region. Used to mark peers as offline if they do not |
| * apply the commit due to a cache close. |
| */ |
| public Map<InternalDistributedMember, PersistentMemberID> persistentIds; |
| |
| /** |
| * Used on nearside |
| */ |
| RegionCommit(TXCommitMessage msg, InternalRegion r, int maxSize) { |
| this.msg = msg; |
| this.internalRegion = r; |
| this.maxSize = maxSize; |
| } |
| |
| /** |
| * Used on farside who inits r later and never sets maxSize |
| */ |
| RegionCommit(TXCommitMessage msg) { |
| this.msg = msg; |
| } |
| |
| public String getRegionPath() { |
| return regionPath; |
| } |
| |
| public void incRefCount() { |
| this.refCount++; |
| } |
| |
| /** |
| * Valid on farside after beginProcess. Used to remember what to do at region cleanup time |
| */ |
| private boolean needsUnlock; |
| /** |
| * Valid on farside after beginProcess. Used to remember what to do at region cleanup time |
| */ |
| private boolean needsLRUEnd; |
| /** |
| * Valid on farside after beginProcess This is the txEvent that should be used by this |
| * RegionCommit |
| */ |
| private TXRmtEvent txEvent; |
| |
| /** |
| * Called to setup a region commit so its entryOps can be processed |
| * |
| * @return true if region should be processed; false if it can be ignored |
| * @throws CacheClosedException if the cache has been closed |
| */ |
| boolean beginProcess(DistributionManager dm, TransactionId txIdent, TXRmtEvent txEvent) |
| throws CacheClosedException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("begin processing TXCommitMessage {} for region {}", txIdent, this.regionPath); |
| } |
| try { |
| if (!hookupRegion(dm)) { |
| return false; |
| } |
| if (msg.isAckRequired() |
| && (this.internalRegion == null || !this.internalRegion.getScope().isDistributed())) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Received unneeded commit data for region {}", this.regionPath); |
| } |
| this.msg.addProcessingException(new RegionDestroyedException( |
| "Region not found", |
| this.regionPath)); |
| this.internalRegion = null; |
| return false; |
| } |
| this.needsUnlock = this.internalRegion.lockGII(); |
| this.internalRegion.txLRUStart(); |
| this.needsLRUEnd = true; |
| if (this.internalRegion.isInitialized()) { |
| // We don't want the txEvent to know anything about our regions |
| // that are still doing gii. |
| this.txEvent = txEvent; |
| } |
| } catch (RegionDestroyedException e) { |
| this.msg.addProcessingException(e); |
| // Region destroyed: Update cancelled |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Received unneeded commit data for region {} because the region was destroyed.", |
| this.regionPath, e); |
| } |
| this.internalRegion = null; |
| } |
| return this.internalRegion != null; |
| } |
| |
| private boolean hookupRegion(DistributionManager dm) { |
| this.internalRegion = getRegionByPath(dm, regionPath); |
| if (this.internalRegion == null && this.parentRegionPath != null) { |
| this.internalRegion = getRegionByPath(dm, this.parentRegionPath); |
| this.regionPath = this.parentRegionPath; |
| } |
| if (this.internalRegion == null && dm.getSystem().isLoner()) { |
| // If there are additional regions that the server enlisted in the tx, |
| // which the client does not have, the client can just ignore the region |
| // see bug 51922 |
| return false; |
| } |
| return true; |
| } |
| |
| LocalRegion getRegionByPath(DistributionManager dm, String regionPath) { |
| InternalCache cache = dm.getCache(); |
| return cache == null ? null : (LocalRegion) cache.getRegionByPath(regionPath); |
| } |
| |
| /** |
| * Called when processing is complete; only needs to be called if beginProcess returned true. |
| */ |
| void endProcess() { |
| if (this.internalRegion != null) { |
| try { |
| if (this.needsLRUEnd) { |
| this.needsLRUEnd = false; |
| this.internalRegion.txLRUEnd(); |
| } |
| } finally { |
| if (this.needsUnlock) { |
| this.needsUnlock = false; |
| this.internalRegion.unlockGII(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the eventId to use for the give farside entry op. |
| * |
| * @since GemFire 5.7 |
| */ |
| private EventID getEventId(FarSideEntryOp entryOp) { |
| return this.msg.getEventId(entryOp.eventOffset); |
| } |
| |
| |
| /** |
| * Apply a single tx entry op on the far side |
| */ |
| @SuppressWarnings("synthetic-access") |
| protected void txApplyEntryOp(FarSideEntryOp entryOp, List<EntryEventImpl> pendingCallbacks) { |
| if (this.internalRegion == null) { |
| return; |
| } |
| EventID eventID = getEventId(entryOp); |
| boolean isDuplicate = this.internalRegion.hasSeenEvent(eventID); |
| boolean callbacksOnly = |
| (this.internalRegion.getDataPolicy() == DataPolicy.PARTITION) || isDuplicate; |
| if (this.internalRegion instanceof PartitionedRegion) { |
| /* |
| * This happens when we don't have the bucket and are getting adjunct notification |
| */ |
| // No need to release because it is added to pendingCallbacks and they will be released |
| // later |
| EntryEventImpl eei = |
| txCallbackEventFactory.createCallbackEvent(this.internalRegion, entryOp.op, |
| entryOp.key, |
| entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg, |
| entryOp.filterRoutingInfo, this.msg.bridgeContext, null, entryOp.versionTag, |
| entryOp.tailKey); |
| if (entryOp.filterRoutingInfo != null) { |
| eei.setLocalFilterInfo( |
| entryOp.filterRoutingInfo.getFilterInfo(this.internalRegion.getCache().getMyId())); |
| } |
| if (isDuplicate) { |
| eei.setPossibleDuplicate(true); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("invoking transactional callbacks for {} key={} needsUnlock={} event={}", |
| entryOp.op, entryOp.key, this.needsUnlock, eei); |
| } |
| // we reach this spot because the event is either delivered to this member |
| // as an "adjunct" message or because the bucket was being created when |
| // the message was sent and already reflects the change caused by this event. |
| // In the latter case we need to invoke listeners |
| final boolean skipListeners = !isDuplicate; |
| eei.setInvokePRCallbacks(!skipListeners); |
| pendingCallbacks.add(eei); |
| return; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("applying transactional {} key={} needsUnlock={} eventId {} with routing {}", |
| entryOp.op, entryOp.key, this.needsUnlock, getEventId(entryOp), |
| entryOp.filterRoutingInfo); |
| } |
| if (entryOp.versionTag != null) { |
| entryOp.versionTag.replaceNullIDs(this.msg.getSender()); |
| } |
| if (entryOp.op.isDestroy()) { |
| this.internalRegion.txApplyDestroy(entryOp.key, this.msg.txIdent, this.txEvent, |
| this.needsUnlock, |
| entryOp.op, getEventId(entryOp), entryOp.callbackArg, pendingCallbacks, |
| entryOp.filterRoutingInfo, this.msg.bridgeContext, false /* origin remote */, |
| null/* txEntryState */, entryOp.versionTag, entryOp.tailKey); |
| } else if (entryOp.op.isInvalidate()) { |
| this.internalRegion.txApplyInvalidate(entryOp.key, Token.INVALID, entryOp.didDestroy, |
| this.msg.txIdent, |
| this.txEvent, false /* localOp */, getEventId(entryOp), entryOp.callbackArg, |
| pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext, |
| null/* txEntryState */, entryOp.versionTag, entryOp.tailKey); |
| } else { |
| this.internalRegion.txApplyPut(entryOp.op, entryOp.key, entryOp.value, entryOp.didDestroy, |
| this.msg.txIdent, this.txEvent, getEventId(entryOp), entryOp.callbackArg, |
| pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext, |
| null/* txEntryState */, entryOp.versionTag, entryOp.tailKey); |
| } |
| } |
| |
| /** |
| * Apply a single tx entry op on the far side |
| */ |
| @SuppressWarnings("synthetic-access") |
| protected void txApplyEntryOpAdjunctOnly(FarSideEntryOp entryOp) { |
| if (this.internalRegion == null) { |
| return; |
| } |
| EventID eventID = getEventId(entryOp); |
| boolean isDuplicate = this.internalRegion.hasSeenEvent(eventID); |
| boolean callbacksOnly = |
| (this.internalRegion.getDataPolicy() == DataPolicy.PARTITION) || isDuplicate; |
| if (this.internalRegion instanceof PartitionedRegion) { |
| |
| PartitionedRegion pr = (PartitionedRegion) internalRegion; |
| BucketRegion br = pr.getBucketRegion(entryOp.key); |
| Set bucketOwners = br.getBucketOwners(); |
| InternalDistributedMember thisMember = this.internalRegion.getDistributionManager().getId(); |
| if (bucketOwners.contains(thisMember)) { |
| return; |
| } |
| |
| /* |
| * This happens when we don't have the bucket and are getting adjunct notification |
| */ |
| @Released |
| EntryEventImpl eei = |
| txCallbackEventFactory.createCallbackEvent(this.internalRegion, entryOp.op, |
| entryOp.key, |
| entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg, |
| entryOp.filterRoutingInfo, this.msg.bridgeContext, null, entryOp.versionTag, |
| entryOp.tailKey); |
| try { |
| if (entryOp.filterRoutingInfo != null) { |
| eei.setLocalFilterInfo( |
| entryOp.filterRoutingInfo.getFilterInfo(this.internalRegion.getCache().getMyId())); |
| } |
| if (isDuplicate) { |
| eei.setPossibleDuplicate(true); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("invoking transactional callbacks for {} key={} needsUnlock={} event={}", |
| entryOp.op, entryOp.key, this.needsUnlock, eei); |
| } |
| // we reach this spot because the event is either delivered to this member |
| // as an "adjunct" message or because the bucket was being created when |
| // the message was sent and already reflects the change caused by this event. |
| // In the latter case we need to invoke listeners |
| final boolean skipListeners = !isDuplicate; |
| eei.invokeCallbacks(this.internalRegion, skipListeners, true); |
| } finally { |
| eei.release(); |
| } |
| return; |
| } |
| } |
| |
| boolean isEmpty() { |
| return this.opKeys == null; |
| } |
| |
| boolean needsAck() { |
| return this.internalRegion.getScope().isDistributedAck(); |
| } |
| |
| void addOp(Object key, TXEntryState entry) { |
| if (this.opKeys == null) { |
| this.opKeys = new ArrayList(this.maxSize); |
| this.opEntries = new ArrayList(this.maxSize); |
| } |
| this.opKeys.add(key); |
| this.opEntries.add(entry); |
| } |
| |
| |
| public boolean isForceFireEvent(DistributionManager dm) { |
| LocalRegion r = getRegionByPath(dm, regionPath); |
| if (r instanceof PartitionedRegion || (r != null && r.isUsedForPartitionedRegionBucket())) { |
| return false; |
| } |
| return true; |
| } |
| |
| public void fromData(DataInput in, boolean hasShadowKey) |
| throws IOException, ClassNotFoundException { |
| this.regionPath = DataSerializer.readString(in); |
| this.parentRegionPath = DataSerializer.readString(in); |
| |
| int size = in.readInt(); |
| if (size > 0) { |
| this.opKeys = new ArrayList(size); |
| this.opEntries = new ArrayList(size); |
| final boolean largeModCount = in.readBoolean(); |
| this.memberId = DataSerializer.readObject(in); |
| for (int i = 0; i < size; i++) { |
| FarSideEntryOp entryOp = new FarSideEntryOp(); |
| // shadowkey is not being sent to clients |
| entryOp.fromData(in, largeModCount, hasShadowKey); |
| if (entryOp.versionTag != null && this.memberId != null) { |
| entryOp.versionTag.setMemberID(this.memberId); |
| } |
| this.msg.addFarSideEntryOp(entryOp); |
| this.opKeys.add(entryOp.key); |
| this.opEntries.add(entryOp); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(64); |
| if (this.regionPath != null) { |
| result.append(this.regionPath); |
| } else { |
| result.append(this.internalRegion.getFullPath()); |
| } |
| if (this.refCount > 0) { |
| result.append(" refCount=").append(this.refCount); |
| } |
| return result.toString(); |
| } |
| |
| private void basicToData(DataOutput out, |
| SerializationContext context, |
| boolean useShadowKey) throws IOException { |
| if (this.internalRegion != null) { |
| DataSerializer.writeString(this.internalRegion.getFullPath(), out); |
| if (this.internalRegion instanceof BucketRegion) { |
| DataSerializer.writeString( |
| ((Bucket) this.internalRegion).getPartitionedRegion().getFullPath(), out); |
| } else { |
| DataSerializer.writeString(null, out); |
| } |
| } else { |
| DataSerializer.writeString(this.regionPath, out); |
| DataSerializer.writeString(this.parentRegionPath, out); |
| } |
| |
| if (isEmpty() || this.opKeys.size() == 0) { |
| out.writeInt(0); |
| } else { |
| int size = this.opKeys.size(); |
| out.writeInt(size); |
| |
| final boolean largeModCount; |
| if (this.msg.txState != null) { |
| largeModCount = this.msg.txState.needsLargeModCount(); |
| } else { |
| largeModCount = this.msg.needsLargeModCount; |
| } |
| out.writeBoolean(largeModCount); |
| |
| final boolean sendVersionTags = |
| this.msg.clientVersion == null || Version.GFE_70.compareTo(this.msg.clientVersion) <= 0; |
| if (sendVersionTags) { |
| VersionSource member = this.memberId; |
| if (member == null) { |
| if (this.internalRegion == null) { |
| Assert.assertTrue(this.msg.txState == null); |
| } else { |
| member = this.internalRegion.getVersionMember(); |
| } |
| } |
| DataSerializer.writeObject(member, out); |
| } |
| for (int i = 0; i < size; i++) { |
| DataSerializer.writeObject(this.opKeys.get(i), out); |
| if (this.msg.txState != null) { |
| /* we are still on tx node and have the entry state */ |
| ((TXEntryState) this.opEntries.get(i)).toFarSideData(out, context, largeModCount, |
| sendVersionTags, useShadowKey); |
| } else { |
| ((FarSideEntryOp) this.opEntries.get(i)).toData(out, largeModCount, sendVersionTags, |
| useShadowKey); |
| } |
| } |
| } |
| } |
| |
| |
| public void toData(DataOutput out, SerializationContext context, boolean useShadowKey) |
| throws IOException { |
| if (this.preserializedBuffer != null) { |
| this.preserializedBuffer.rewind(); |
| this.preserializedBuffer.sendTo(out); |
| } else if (this.refCount > 1) { |
| Version v = InternalDataSerializer.getVersionForDataStream(out); |
| HeapDataOutputStream hdos = new HeapDataOutputStream(1024, v); |
| basicToData(hdos, context, useShadowKey); |
| this.preserializedBuffer = hdos; |
| this.preserializedBuffer.sendTo(out); |
| } else { |
| basicToData(out, context, useShadowKey); |
| } |
| } |
| |
| /** |
| * Holds data that describes a tx entry op on the far side. |
| * |
| * @since GemFire 5.0 |
| */ |
| public class FarSideEntryOp implements Comparable { |
| public Operation op; |
| public int modSerialNum; |
| public int eventOffset; |
| public Object key; |
| public Object value; |
| public boolean didDestroy; |
| public Object callbackArg; |
| private FilterRoutingInfo filterRoutingInfo; |
| private VersionTag versionTag; |
| private long tailKey; |
| |
| /** |
| * Create a new representation of a tx entry op on the far side. All init will be done by a |
| * call to fromData |
| */ |
| public FarSideEntryOp() {} |
| |
| /** |
| * Creates and returns a new instance of a tx entry op on the far side. The "toData" that this |
| * should match is {@link TXEntryState#toFarSideData}. |
| * |
| * @param in the data input that is used to read the data for this entry op |
| * @param largeModCount true if the mod count is a int instead of a byte. |
| * @param readShadowKey true if a long shadowKey should be read |
| */ |
| public void fromData(DataInput in, boolean largeModCount, boolean readShadowKey) |
| throws IOException, ClassNotFoundException { |
| this.key = DataSerializer.readObject(in); |
| this.op = Operation.fromOrdinal(in.readByte()); |
| if (largeModCount) { |
| this.modSerialNum = in.readInt(); |
| } else { |
| this.modSerialNum = in.readByte(); |
| } |
| this.callbackArg = DataSerializer.readObject(in); |
| this.filterRoutingInfo = DataSerializer.readObject(in); |
| this.versionTag = DataSerializer.readObject(in); |
| if (readShadowKey) { |
| this.tailKey = in.readLong(); |
| } |
| this.eventOffset = in.readInt(); |
| if (!this.op.isDestroy()) { |
| this.didDestroy = in.readBoolean(); |
| if (!this.op.isInvalidate()) { |
| boolean isTokenOrByteArray = in.readBoolean(); |
| if (isTokenOrByteArray) { |
| // token or byte[] |
| this.value = DataSerializer.readObject(in); |
| } else { |
| // CachedDeserializable, Object, or PDX |
| this.value = CachedDeserializableFactory.create(DataSerializer.readByteArray(in), |
| GemFireCacheImpl.getInstance()); |
| } |
| } |
| } |
| } |
| |
| public void toData(DataOutput out, boolean largeModCount, boolean sendVersionTag, |
| boolean sendShadowKey) throws IOException { |
| // DataSerializer.writeObject(this.key,out); |
| /* Don't serialize key because caller did that already */ |
| |
| out.writeByte(this.op.ordinal); |
| if (largeModCount) { |
| out.writeInt(this.modSerialNum); |
| } else { |
| out.writeByte(this.modSerialNum); |
| } |
| DataSerializer.writeObject(this.callbackArg, out); |
| DataSerializer.writeObject(this.filterRoutingInfo, out); |
| if (sendVersionTag) { |
| DataSerializer.writeObject(this.versionTag, out); |
| } |
| if (sendShadowKey) { |
| out.writeLong(this.tailKey); |
| } |
| out.writeInt(this.eventOffset); |
| if (!this.op.isDestroy()) { |
| out.writeBoolean(this.didDestroy); |
| if (!this.op.isInvalidate()) { |
| boolean sendObject = Token.isInvalidOrRemoved(this.value); |
| sendObject = sendObject || this.value instanceof byte[]; |
| out.writeBoolean(sendObject); |
| if (sendObject) { |
| DataSerializer.writeObject(this.value, out); |
| } else { |
| DataSerializer.writeObjectAsByteArray(this.value, out); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * Performs this entryOp on the farside of a tx commit. |
| */ |
| public void process(List<EntryEventImpl> pendingCallbacks) { |
| txApplyEntryOp(this, pendingCallbacks); |
| } |
| |
| public void processAdjunctOnly() { |
| txApplyEntryOpAdjunctOnly(this); |
| } |
| |
| public RegionCommit getRegionCommit() { |
| return RegionCommit.this; |
| } |
| |
| /** |
| * Returns the value to use to sort us |
| */ |
| private int getSortValue() { |
| return this.modSerialNum; |
| } |
| |
| @Override |
| public int compareTo(Object o) { |
| FarSideEntryOp other = (FarSideEntryOp) o; |
| return getSortValue() - other.getSortValue(); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o == null || !(o instanceof FarSideEntryOp)) { |
| return false; |
| } |
| return compareTo(o) == 0; |
| } |
| |
| @Override |
| public int hashCode() { |
| return getSortValue(); |
| } |
| } |
| } |
| |
| Object getTrackerKey() { |
| if (this.lockId != null) { |
| return this.lockId; |
| } else { |
| return this.txIdent; |
| } |
| } |
| |
| /** |
| * Used to prevent processing of the message if we have reported to other FarSiders that we did |
| * not received the CommitProcessMessage |
| */ |
| boolean dontProcess() { |
| return this.dontProcess; |
| } |
| |
| /** |
| * Indicate that this message should not be processed if we receive CommitProcessMessage (late) |
| */ |
| void setDontProcess() { |
| this.dontProcess = true; |
| } |
| |
| boolean isProcessing() { |
| return this.isProcessing; |
| } |
| |
| private void setIsProcessing(boolean isProcessing) { |
| this.isProcessing = isProcessing; |
| } |
| |
| boolean wasProcessed() { |
| return this.wasProcessed; |
| } |
| |
| void setProcessed(boolean wasProcessed) { |
| this.wasProcessed = wasProcessed; |
| } |
| |
| /** |
| * The CommitProcessForLockIDMessaage is sent by the Distributed ACK TX origin to the recipients |
| * (aka FarSiders) to indicate that a previously received RegionCommit that contained a lockId |
| * should commence processing. |
| */ |
| public static class CommitProcessForLockIdMessage extends CommitProcessMessage { |
| private TXLockId lockId; |
| |
| public CommitProcessForLockIdMessage() { |
| // Zero arg constructor for DataSerializer |
| } |
| |
| public CommitProcessForLockIdMessage(TXLockId lockId) { |
| this.lockId = lockId; |
| Assert.assertTrue(this.lockId != null, |
| "CommitProcessForLockIdMessage must have a non-null lockid!"); |
| } |
| |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| final TXCommitMessage mess = waitForMessage(this.lockId, dm); |
| Assert.assertTrue(mess != null, "Commit data for TXLockId: " + this.lockId + " not found"); |
| basicProcess(mess, dm); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return COMMIT_PROCESS_FOR_LOCKID_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| InternalDataSerializer.invokeToData(this.lockId, out); |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| this.lockId = TXLockIdImpl.createFromData(in); |
| Assert.assertTrue(this.lockId != null, |
| "CommitProcessForLockIdMessage must have a non-null lockid!"); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(128); |
| result.append("CommitProcessForLockIdMessage@").append(System.identityHashCode(this)) |
| .append(" lockId=").append(this.lockId); |
| return result.toString(); |
| } |
| } |
| |
| /** |
| * The CommitProcessForTXIdMessaage is sent by the Distributed ACK TX origin to the recipients |
| * (aka FarSiders) to indicate that a previously received RegionCommit that contained a TXId |
| * should commence processing. RegionCommit messages that contain a TXId (and no TXLockId) are |
| * typically sent if all the TX changes are a result of load/netsearch/netload values (thus no |
| * lockid) |
| */ |
| public static class CommitProcessForTXIdMessage extends CommitProcessMessage { |
| private TXId txId; |
| |
| public CommitProcessForTXIdMessage() { |
| // Zero arg constructor for DataSerializer |
| } |
| |
| public CommitProcessForTXIdMessage(TXId txId) { |
| this.txId = txId; |
| Assert.assertTrue(this.txId != null, |
| "CommitProcessMessageForTXId must have a non-null txid!"); |
| } |
| |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| final TXCommitMessage mess = waitForMessage(this.txId, dm); |
| Assert.assertTrue(mess != null, "Commit data for TXId: " + this.txId + " not found"); |
| basicProcess(mess, dm); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return COMMIT_PROCESS_FOR_TXID_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| InternalDataSerializer.invokeToData(this.txId, out); |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| this.txId = TXId.createFromData(in); |
| Assert.assertTrue(this.txId != null, |
| "CommitProcessMessageForTXId must have a non-null txid!"); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(128); |
| result.append("CommitProcessForTXIdMessage@").append(System.identityHashCode(this)) |
| .append(" txId=").append(this.txId); |
| return result.toString(); |
| } |
| } |
| |
| public abstract static class CommitProcessMessage extends PooledDistributionMessage { |
| protected void basicProcess(final TXCommitMessage mess, final ClusterDistributionManager dm) { |
| dm.removeMembershipListener(mess); |
| synchronized (mess) { |
| if (mess.dontProcess()) { |
| return; |
| } |
| } |
| try { |
| mess.basicProcess(); |
| } finally { |
| txTracker.processed(mess); |
| } |
| } |
| } |
| |
| /** |
| * The CommitProcessQueryMessage is used to attempt to recover - in the Distributed ACK TXs - when |
| * the origin of the CommitProcess messages departed from the distributed system. The sender of |
| * this message is attempting to query other potential fellow FarSiders (aka recipients) who may |
| * have received the CommitProcess message. |
| * |
| * Since the occurance of this message will be rare (hopefully), it was decided to be general |
| * about the the tracker key - opting not to have specific messages for each type like |
| * CommitProcessFor<Lock/TX>Id - and take the performance penalty of an extra call to |
| * DataSerializer |
| */ |
| public static class CommitProcessQueryMessage extends PooledDistributionMessage { |
| private Object trackerKey; // Either a TXLockId or a TXId |
| private int processorId; |
| |
| public CommitProcessQueryMessage() { |
| // Zero arg constructor for DataSerializer |
| } |
| |
| public CommitProcessQueryMessage(Object trackerKey, int processorId) { |
| this.trackerKey = trackerKey; |
| this.processorId = processorId; |
| } |
| |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| final boolean processMsgReceived = txTracker.commitProcessReceived(this.trackerKey); |
| if (!processMsgReceived) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CommitProcessQuery did not find {} in the history", this.trackerKey); |
| } |
| } |
| |
| // Reply to the fellow FarSider as to whether the |
| // CommitProcess message was received |
| CommitProcessQueryReplyMessage resp = new CommitProcessQueryReplyMessage(processMsgReceived); |
| resp.setProcessorId(this.processorId); |
| resp.setRecipient(this.getSender()); |
| dm.putOutgoing(resp); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| DataSerializer.writeObject(this.trackerKey, out); |
| out.writeInt(this.processorId); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return COMMIT_PROCESS_QUERY_MESSAGE; |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| this.trackerKey = DataSerializer.readObject(in); |
| this.processorId = in.readInt(); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(128); |
| result.append("CommitProcessQueryMessage@").append(System.identityHashCode(this)) |
| .append(" trackerKeyClass=").append(this.trackerKey.getClass().getName()) |
| .append(" trackerKey=").append(this.trackerKey).append(" processorId=") |
| .append(this.processorId); |
| return result.toString(); |
| } |
| } |
| |
| /********************* Commit Process Query Response Message **********************************/ |
| public static class CommitProcessQueryReplyMessage extends ReplyMessage { |
| private boolean wasReceived; |
| |
| public CommitProcessQueryReplyMessage(boolean wasReceived) { |
| this.wasReceived = wasReceived; |
| } |
| |
| public CommitProcessQueryReplyMessage() { |
| // zero arg constructor for DataSerializer |
| } |
| |
| public boolean wasReceived() { |
| return wasReceived; |
| } |
| |
| @Override |
| public int getDSFID() { |
| return COMMIT_PROCESS_QUERY_REPLY_MESSAGE; |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.wasReceived = in.readBoolean(); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| out.writeBoolean(this.wasReceived); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(128); |
| result.append("CommitProcessQueryReplyMessage@").append(System.identityHashCode(this)) |
| .append(" wasReceived=").append(this.wasReceived).append(" processorId=") |
| .append(this.processorId).append(" from ").append(this.getSender()); |
| return result.toString(); |
| } |
| } |
| |
| /********************* Commit Process Query Response Processor *********************************/ |
| public static class CommitProcessQueryReplyProcessor extends ReplyProcessor21 { |
| public boolean receivedOnePositive; |
| |
| CommitProcessQueryReplyProcessor(DistributionManager dm, Set members) { |
| super(dm, members); |
| this.receivedOnePositive = false; |
| } |
| |
| @Override |
| public void process(DistributionMessage msg) { |
| CommitProcessQueryReplyMessage ccMess = (CommitProcessQueryReplyMessage) msg; |
| if (ccMess.wasReceived()) { |
| this.receivedOnePositive = true; |
| } |
| super.process(msg); |
| } |
| |
| @Override |
| protected boolean canStopWaiting() { |
| return this.receivedOnePositive; |
| } |
| |
| public boolean receivedACommitProcessMessage() { |
| return this.receivedOnePositive; |
| } |
| } |
| |
| /********************* MembershipListener Implementation ***************************************/ |
| @Override |
| public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) { |
| // do nothing |
| } |
| |
| @Override |
| public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id, |
| InternalDistributedMember whoSuspected, String reason) {} |
| |
| @Override |
| public void quorumLost(DistributionManager distributionManager, |
| Set<InternalDistributedMember> failures, |
| List<InternalDistributedMember> remaining) {} |
| |
| /** |
| * return true if the member initiating this transaction has left the cluster |
| */ |
| public boolean isDepartureNoticed() { |
| return departureNoticed; |
| } |
| |
| private void doOriginDepartedCommit() { |
| try { |
| // Set processor to zero to avoid the ack to the now departed origin |
| processorId = 0; |
| basicProcess(); |
| } finally { |
| txTracker.processed(this); |
| } |
| } |
| |
| @Override |
| public void memberDeparted(DistributionManager distributionManager, |
| final InternalDistributedMember id, boolean crashed) { |
| |
| if (!getSender().equals(id)) { |
| return; |
| } |
| distributionManager.removeMembershipListener(this); |
| |
| synchronized (this) { |
| if (isProcessing() || this.departureNoticed) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Member departed: Commit data is already being processed for lockid: {}", |
| lockId); |
| } |
| return; |
| } |
| this.departureNoticed = true; |
| } |
| |
| // Send message to fellow FarSiders (aka recipients), if any, to |
| // determine if any one of them have received a CommitProcessMessage |
| if (getFarSiders() != null && !getFarSiders().isEmpty()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Member departed: {} sending query for CommitProcess message to other recipients.", id); |
| } |
| |
| // Create a new thread, send the CommitProcessQuery, wait for a response and potentially |
| // process |
| // Should I use a thread pool?, Darrel suggests look in DM somewhere or introduce a zero |
| // sized thread pool |
| Thread fellowFarSidersQuery = new LoggingThread("CommitProcessQuery Thread", |
| () -> doCommitProcessQuery(id)); |
| fellowFarSidersQuery.start(); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Member departed: {}. Processing commit data.", getSender()); |
| } |
| |
| // Optimimal case where we are the only FarSider, assume we |
| // will never get the CommitProcess message, but it |
| // doesn't matter since we can commit anyway. |
| // Start a new thread to process the commit |
| Thread originDepartedCommit = new LoggingThread("Origin Departed Commit", |
| this::doOriginDepartedCommit); |
| originDepartedCommit.start(); |
| } |
| } |
| |
| HashSet getFarSiders() { |
| return farSiders; |
| } |
| |
| DistributionManager getDistributionManager() { |
| return dm; |
| } |
| |
| void doCommitProcessQuery(final InternalDistributedMember id) { |
| CommitProcessQueryReplyProcessor replyProcessor = createReplyProcessor(); |
| CommitProcessQueryMessage queryMessage = createQueryMessage(replyProcessor); |
| queryMessage.setRecipients(this.farSiders); |
| getDistributionManager().putOutgoing(queryMessage); |
| // Wait for any one positive response or all negative responses. |
| // (while() loop removed for bug 36983 - you can't loop on waitForReplies() |
| getDistributionManager().getCancelCriterion().checkCancelInProgress(null); |
| try { |
| replyProcessor.waitForRepliesUninterruptibly(); |
| } catch (ReplyException e) { |
| e.handleCause(); |
| } |
| if (replyProcessor.receivedACommitProcessMessage()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Transaction associated with lockID: {} from orign {} is processing due to a received \"commit process\" message", |
| lockId, id); |
| } |
| |
| try { |
| // Set processor to zero to avoid the ack to the now departed origin |
| processorId = 0; |
| basicProcess(); |
| } finally { |
| txTracker.processed(this); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Transaction associated with lockID: {} from origin {} ignored. No other recipients received \"commit process\" message", |
| lockId, id); |
| } |
| txTracker.removeMessage(this); |
| } |
| } |
| |
| CommitProcessQueryReplyProcessor createReplyProcessor() { |
| return new CommitProcessQueryReplyProcessor(dm, farSiders); |
| } |
| |
| CommitProcessQueryMessage createQueryMessage(CommitProcessQueryReplyProcessor replyProcessor) { |
| return new CommitProcessQueryMessage(getTrackerKey(), replyProcessor.getProcessorId()); |
| } |
| |
| private DistributedMember getMemberFromTrackerKey(Object trackerKey) { |
| if (trackerKey instanceof TXId) { |
| TXId id1 = (TXId) trackerKey; |
| return id1.getMemberId(); |
| } else if (trackerKey instanceof TXLockId) { |
| TXLockId id2 = (TXLockId) trackerKey; |
| return id2.getMemberId(); |
| } |
| return null; |
| } |
| |
| void setUpdateLockMembers() { |
| this.lockNeedsUpdate = true; |
| } |
| |
| /** |
| * Intended to be called after TXState.applyChanges when the potential for a different set of TX |
| * members has been determined and it is safe to ignore any new members because the changes have |
| * been applied to committed state. This was added as the solution to bug 32999 and the recovery |
| * when the TXLock Lessor (the sending VM) crashes/departs before or while sending the |
| * TXCommitMessage. |
| * |
| * @see TXState#commit() |
| * @see org.apache.geode.internal.cache.locks.TXLockBatch#getBatchId() |
| */ |
| private void updateLockMembers() { |
| if (this.lockNeedsUpdate && this.lockId != null) { |
| TXLockService.createDTLS(this.dm.getSystem()).updateParticipants(this.lockId, |
| this.msgMap.keySet()); |
| } |
| } |
| |
| /** |
| * Reply processor which collects all CommitReplyExceptions and emits a detailed failure exception |
| * if problems occur |
| * |
| * @since GemFire 5.7 |
| */ |
| private class CommitReplyProcessor extends ReliableReplyProcessor21 { |
| private HashMap msgMap; |
| |
| public CommitReplyProcessor(DistributionManager dm, Set initMembers, HashMap msgMap) { |
| super(dm, initMembers); |
| this.msgMap = msgMap; |
| } |
| |
| public void waitForCommitCompletion() { |
| try { |
| waitForRepliesUninterruptibly(); |
| } catch (CommitExceptionCollectingException e) { |
| e.handlePotentialCommitFailure(msgMap); |
| } |
| } |
| |
| @Override |
| protected void processException(DistributionMessage msg, ReplyException ex) { |
| if (msg instanceof ReplyMessage) { |
| synchronized (this) { |
| if (this.exception == null) { |
| // Exception Container |
| this.exception = new CommitExceptionCollectingException(txIdent); |
| } |
| CommitExceptionCollectingException cce = |
| (CommitExceptionCollectingException) this.exception; |
| if (ex instanceof CommitReplyException) { |
| CommitReplyException cre = (CommitReplyException) ex; |
| cce.addExceptionsFromMember(msg.getSender(), cre.getExceptions()); |
| } else { |
| cce.addExceptionsFromMember(msg.getSender(), Collections.singleton(ex)); |
| } |
| } |
| } |
| } |
| |
| @Override |
| protected boolean stopBecauseOfExceptions() { |
| return false; |
| } |
| |
| public Set getCacheClosedMembers() { |
| if (this.exception != null) { |
| CommitExceptionCollectingException cce = |
| (CommitExceptionCollectingException) this.exception; |
| return cce.getCacheClosedMembers(); |
| } else { |
| return Collections.emptySet(); |
| } |
| } |
| |
| public Set getRegionDestroyedMembers(String regionFullPath) { |
| if (this.exception != null) { |
| CommitExceptionCollectingException cce = |
| (CommitExceptionCollectingException) this.exception; |
| return cce.getRegionDestroyedMembers(regionFullPath); |
| } else { |
| return Collections.emptySet(); |
| } |
| } |
| } |
| |
| /** |
| * An Exception that collects many remote CommitExceptions |
| * |
| * @since GemFire 5.7 |
| */ |
| public static class CommitExceptionCollectingException extends ReplyException { |
| private static final long serialVersionUID = 589384721273797822L; |
| /** |
| * Set of members that threw CacheClosedExceptions |
| */ |
| private final Set<InternalDistributedMember> cacheExceptions; |
| /** |
| * key=region path, value=Set of members |
| */ |
| private final Map<String, Set<InternalDistributedMember>> regionExceptions; |
| /** |
| * List of exceptions that were unexpected and caused the tx to fail |
| */ |
| private final Map fatalExceptions; |
| |
| private final TXId id; |
| |
| public CommitExceptionCollectingException(TXId txIdent) { |
| this.cacheExceptions = new HashSet(); |
| this.regionExceptions = new HashMap(); |
| this.fatalExceptions = new HashMap(); |
| this.id = txIdent; |
| } |
| |
| /** |
| * Determine if the commit processing was incomplete, if so throw a detailed exception |
| * indicating the source of the problem |
| */ |
| public void handlePotentialCommitFailure( |
| HashMap<InternalDistributedMember, RegionCommitList> msgMap) { |
| if (fatalExceptions.size() > 0) { |
| StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ") |
| .append(id).append(". Caused by the following exceptions: "); |
| for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) { |
| Map.Entry me = (Map.Entry) i.next(); |
| DistributedMember mem = (DistributedMember) me.getKey(); |
| errorMessage.append(" From member: ").append(mem).append(" "); |
| List exceptions = (List) me.getValue(); |
| for (Iterator ei = exceptions.iterator(); ei.hasNext();) { |
| Exception e = (Exception) ei.next(); |
| errorMessage.append(e); |
| for (StackTraceElement ste : e.getStackTrace()) { |
| errorMessage.append("\n\tat ").append(ste); |
| } |
| if (ei.hasNext()) { |
| errorMessage.append("\nAND\n"); |
| } |
| } |
| errorMessage.append("."); |
| } |
| throw new CommitIncompleteException(errorMessage.toString()); |
| } |
| |
| // Mark any persistent members as offline |
| handleClosedMembers(msgMap); |
| handleRegionDestroyed(msgMap); |
| } |
| |
| /** |
| * Mark peers as offline for regions that the peer returned a RegionDestroyedException |
| */ |
| private void handleRegionDestroyed( |
| HashMap<InternalDistributedMember, RegionCommitList> msgMap) { |
| if (regionExceptions == null || regionExceptions.isEmpty()) { |
| return; |
| } |
| |
| for (Map.Entry<InternalDistributedMember, RegionCommitList> memberMap : msgMap.entrySet()) { |
| InternalDistributedMember member = memberMap.getKey(); |
| RegionCommitList rcl = memberMap.getValue(); |
| for (RegionCommit region : rcl) { |
| Set<InternalDistributedMember> failedMembers = |
| regionExceptions.get(region.internalRegion.getFullPath()); |
| if (failedMembers != null && failedMembers.contains(member)) { |
| markMemberOffline(member, region); |
| } |
| } |
| } |
| |
| } |
| |
| /** |
| * Mark peers as offline that returned a cache closed exception |
| */ |
| private void handleClosedMembers(HashMap<InternalDistributedMember, RegionCommitList> msgMap) { |
| for (InternalDistributedMember member : getCacheClosedMembers()) { |
| RegionCommitList rcl = msgMap.get(member); |
| |
| for (RegionCommit region : rcl) { |
| markMemberOffline(member, region); |
| } |
| } |
| } |
| |
| private void markMemberOffline(InternalDistributedMember member, RegionCommit region) { |
| if (region.persistentIds == null) { |
| return; |
| } |
| |
| PersistentMemberID persistentId = region.persistentIds.get(member); |
| /// iterate over the list and mark the members offline |
| if (persistentId != null) { |
| // Fix for bug 42142 - In order for recovery to work, |
| // we must either |
| // 1) persistent the region operation successfully on the peer |
| // 2) record that the peer is offline |
| // or |
| // 3) fail the operation |
| |
| // if we have started to shutdown, we don't want to mark the peer |
| // as offline, or we will think we have newer data when in fact we don't |
| region.internalRegion.getCancelCriterion().checkCancelInProgress(null); |
| |
| // Otherwise, mark the peer as offline, because it didn't complete |
| // the operation. |
| ((DistributedRegion) region.internalRegion).getPersistenceAdvisor().markMemberOffline( |
| member, |
| persistentId); |
| } |
| } |
| |
| public Set<InternalDistributedMember> getCacheClosedMembers() { |
| return this.cacheExceptions; |
| } |
| |
| public Set getRegionDestroyedMembers(String regionFullPath) { |
| Set members = (Set) this.regionExceptions.get(regionFullPath); |
| if (members == null) { |
| members = Collections.emptySet(); |
| } |
| return members; |
| } |
| |
| /** |
| * Protected by (this) |
| */ |
| public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) { |
| for (Iterator iter = exceptions.iterator(); iter.hasNext();) { |
| Exception ex = (Exception) iter.next(); |
| if (ex instanceof CancelException) { |
| cacheExceptions.add(member); |
| } else if (ex instanceof RegionDestroyedException) { |
| String r = ((RegionDestroyedException) ex).getRegionFullPath(); |
| Set<InternalDistributedMember> members = regionExceptions.get(r); |
| if (members == null) { |
| members = new HashSet(); |
| regionExceptions.put(r, members); |
| } |
| members.add(member); |
| } else { |
| List el = (List) this.fatalExceptions.get(member); |
| if (el == null) { |
| el = new ArrayList(2); |
| this.fatalExceptions.put(member, el); |
| } |
| el.add(ex); |
| } |
| } |
| } |
| } |
| |
| public void hookupRegions(DistributionManager dm) { |
| if (regions != null) { |
| Iterator it = regions.iterator(); |
| while (it.hasNext()) { |
| RegionCommit rc = (RegionCommit) it.next(); |
| rc.hookupRegion(dm); |
| } |
| } |
| |
| } |
| |
| |
| /** |
| * Disable firing of TX Listeners. Currently on used on clients. |
| * |
| * @param b disable the listeners |
| */ |
| public void setDisableListeners(boolean b) { |
| disableListeners = true; |
| } |
| |
| public Version getClientVersion() { |
| return clientVersion; |
| } |
| |
| public void setClientVersion(Version clientVersion) { |
| this.clientVersion = clientVersion; |
| } |
| |
| } |