| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *======================================================================== |
| */ |
| |
| package com.gemstone.gemfire.internal.cache; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.IdentityHashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.DataSerializer; |
| import com.gemstone.gemfire.SystemFailure; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheClosedException; |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.cache.CacheRuntimeException; |
| import com.gemstone.gemfire.cache.CommitDistributionException; |
| import com.gemstone.gemfire.cache.CommitIncompleteException; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.Operation; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.RegionDistributionException; |
| import com.gemstone.gemfire.cache.TransactionId; |
| import com.gemstone.gemfire.cache.TransactionListener; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.internal.DM; |
| import com.gemstone.gemfire.distributed.internal.DistributionManager; |
| import com.gemstone.gemfire.distributed.internal.DistributionMessage; |
| import com.gemstone.gemfire.distributed.internal.MembershipListener; |
| import com.gemstone.gemfire.distributed.internal.MessageWithReply; |
| import com.gemstone.gemfire.distributed.internal.PooledDistributionMessage; |
| import com.gemstone.gemfire.distributed.internal.ReliableReplyProcessor21; |
| import com.gemstone.gemfire.distributed.internal.ReplyException; |
| import com.gemstone.gemfire.distributed.internal.ReplyMessage; |
| import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.Assert; |
| import com.gemstone.gemfire.internal.HeapDataOutputStream; |
| import com.gemstone.gemfire.internal.InternalDataSerializer; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.locks.TXLockId; |
| import com.gemstone.gemfire.internal.cache.locks.TXLockIdImpl; |
| import com.gemstone.gemfire.internal.cache.locks.TXLockService; |
| import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import com.gemstone.gemfire.internal.cache.versions.VersionSource; |
| import com.gemstone.gemfire.internal.cache.versions.VersionTag; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| |
| /** TXCommitMessage is the message that contains all the information |
| * that needs to be distributed, on commit, to other cache members. |
| * |
| * @author Darrel Schneider |
| * @since 4.0 |
| * |
| */ |
| public final class TXCommitMessage extends PooledDistributionMessage implements MembershipListener, MessageWithReply |
| { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| // Keep a 60 second history @ an estimated 1092 transactions/second ~= 16^4 |
| protected static final TXFarSideCMTracker txTracker = new TXFarSideCMTracker((60 * 1092)); |
| |
| private ArrayList regions; // list of RegionCommit instances |
| protected TXId txIdent; |
| protected int processorId; // 0 unless needsAck is true |
| protected TXLockIdImpl lockId; |
| protected HashSet farSiders; |
| |
| protected transient DM dm; // Used on the sending side of this message |
| private transient int sequenceNum = 0; |
| private transient HashMap<InternalDistributedMember, RegionCommitList> msgMap = null; // Maps receiver Serializables to RegionCommitList instances |
| private transient RegionCommit currentRegion; |
| protected transient TXState txState = null; |
| private transient boolean wasProcessed; |
| private transient boolean isProcessing; |
| private transient boolean dontProcess; |
| private transient boolean departureNoticed = false; |
| private transient boolean lockNeedsUpdate = false; |
| private transient boolean ackRequired = true; |
| /** |
| * List of operations to do when processing this tx. |
| * Valid on farside only. |
| */ |
| private transient ArrayList farSideEntryOps; |
| private transient byte[] farsideBaseMembershipId; // only available on farside |
| private transient long farsideBaseThreadId; // only available on farside |
| private transient long farsideBaseSequenceId; // only available on farside |
| |
| /** (Nearside) true of any regions in this TX have required roles */ |
| private transient boolean hasReliableRegions = false; |
| |
| /** Set of all caching exceptions produced hile processing this tx */ |
| private transient Set processingExceptions = Collections.EMPTY_SET; |
| |
| private transient ClientProxyMembershipID bridgeContext = null; |
| |
| /** |
| * Version of the client that this TXCommitMessage is being sent to. |
| * Used for backwards compatibility |
| */ |
| private transient Version clientVersion; |
| |
| /** |
| * A token to be put in TXManagerImpl#failoverMap to represent a |
| * CommitConflictException while committing a transaction |
| */ |
| public static final TXCommitMessage CMT_CONFLICT_MSG = new TXCommitMessage(); |
| /** |
| * A token to be put in TXManagerImpl#failoverMap to represent a |
| * TransactionDataNodeHasDepartedException while committing a transaction |
| */ |
| public static final TXCommitMessage REBALANCE_MSG = new TXCommitMessage(); |
| /** |
| * A token to be put in TXManagerImpl#failoverMap to represent an |
| * exception while committing a transaction |
| */ |
| public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage(); |
| /* |
| /** |
| * this is set if this message should deserialize the WAN shadowKey |
| * sent by the sender. Sender will not send shadowKeys when there is |
| * a mix of 7.0 and 7.0.1 members |
| * |
| private transient boolean shouldReadShadowKey; |
| /** |
| * this is set if the sender has decided to send WAN shadowKey |
| * for 7.0.1 members |
| * |
| private transient boolean shouldWriteShadowKey; |
| */ |
| |
| public TXCommitMessage(TXId txIdent, DM dm, TXState txState) |
| { |
| this.dm = dm; |
| this.txIdent = txIdent; |
| this.lockId = null; |
| this.regions = null; |
| this.txState = txState; |
| this.wasProcessed = false; |
| this.isProcessing = false; |
| this.dontProcess = false; |
| this.farSiders = null; |
| this.bridgeContext = txState.bridgeContext; |
| } |
| public TXCommitMessage() { |
| // zero arg constructor for DataSerializer |
| } |
| |
| static public TXFarSideCMTracker getTracker() { |
| return TXCommitMessage.txTracker; |
| } |
| |
| /** |
| * Create and return an eventId given its offset. |
| * @since 5.7 |
| */ |
| protected EventID getEventId(int eventOffset) { |
| return new EventID(this.farsideBaseMembershipId, |
| this.farsideBaseThreadId, |
| this.farsideBaseSequenceId + eventOffset); |
| } |
| |
| /** |
| * Return the TXCommitMessage we have already received that is |
| * associated with id. Note because of bug 37657 we may need to wait |
| * for it to show up. |
| */ |
| static public TXCommitMessage waitForMessage(Object id, DM dm) { |
| TXFarSideCMTracker map = getTracker(); |
| return map.waitForMessage(id, dm); |
| } |
| |
| void startRegion(LocalRegion r, int maxSize) { |
| this.currentRegion = new RegionCommit(this, r, maxSize); |
| if (r.requiresReliabilityCheck()) { |
| this.hasReliableRegions = true; |
| } |
| } |
| void finishRegion(Set<InternalDistributedMember> s) { |
| // make sure we have some changes and someone to send them to |
| if (!this.currentRegion.isEmpty() && s != null && !s.isEmpty()) { |
| //Get the persistent ids for the current region and save them |
| Map<InternalDistributedMember, PersistentMemberID> persistentIds = getPersistentIds(this.currentRegion.r); |
| this.currentRegion.persistentIds = persistentIds; |
| |
| if (this.msgMap == null) { |
| this.msgMap = new HashMap<InternalDistributedMember, RegionCommitList>(); |
| } |
| { |
| RegionCommitList newRCL = null; |
| Iterator<InternalDistributedMember> it = s.iterator(); |
| while (it.hasNext()) { |
| InternalDistributedMember recipient = it.next(); |
| |
| if (!this.dm.getDistributionManagerIds().contains(recipient)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Skipping member {} due to dist list absence", recipient); |
| } |
| // skip this guy since the dm no longer knows about him |
| continue; |
| } |
| RegionCommitList rcl = this.msgMap.get(recipient); |
| if (rcl == null) { |
| if (newRCL == null) { |
| rcl = new RegionCommitList(); |
| rcl.add(this.currentRegion); |
| newRCL = rcl; |
| } else { |
| rcl = newRCL; |
| } |
| this.msgMap.put(recipient, rcl); |
| } else if (rcl.get(rcl.size()-1) != this.currentRegion) { |
| rcl.add(this.currentRegion); |
| } |
| } |
| } |
| { // Now deal with each existing recipient that does not care |
| // about this region |
| Iterator<Map.Entry<InternalDistributedMember, RegionCommitList>> it = this.msgMap.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry<InternalDistributedMember, RegionCommitList> me = it.next(); |
| if (!s.contains(me.getKey())) { |
| RegionCommitList rcl = me.getValue(); |
| RegionCommitList trimmedRcl = rcl.trim(this.currentRegion); |
| if (trimmedRcl != rcl) { |
| me.setValue(trimmedRcl); |
| } |
| } |
| } |
| } |
| } |
| this.currentRegion = null; |
| } |
| |
| |
| |
| private Map<InternalDistributedMember, PersistentMemberID> getPersistentIds(LocalRegion r) { |
| if(r instanceof DistributedRegion) { |
| return ((DistributedRegion) r).getCacheDistributionAdvisor().advisePersistentMembers(); |
| } else { |
| return Collections.EMPTY_MAP; |
| } |
| } |
| |
| void finishRegionComplete() { |
| // make sure we have some changes and someone to send them to |
| if (!this.currentRegion.isEmpty()) { |
| { |
| if(this.regions==null) { |
| this.regions = new RegionCommitList(); |
| } |
| this.regions.add(this.currentRegion); |
| } |
| } |
| this.currentRegion = null; |
| } |
| |
| |
| Map viewVersions = new HashMap(); |
| |
| private Boolean needsLargeModCount; |
| |
| private transient boolean disableListeners = false; |
| |
| |
| /** record CacheDistributionAdvisor.startOperation versions for later cleanup */ |
| protected void addViewVersion(DistributedRegion dr, long version) { |
| viewVersions.put(dr, Long.valueOf(version)); |
| } |
| protected void releaseViewVersions() { |
| RuntimeException rte = null; |
| for (Iterator it=viewVersions.entrySet().iterator(); it.hasNext(); ) { |
| Map.Entry e = (Map.Entry)it.next(); |
| DistributedRegion dr = (DistributedRegion)e.getKey(); |
| Long viewVersion = (Long)e.getValue(); |
| // need to continue the iteration if one of the regions is destroyed |
| // since others may still be okay |
| try { |
| long newv = dr.getDistributionAdvisor().endOperation(viewVersion.longValue()); |
| } |
| catch (RuntimeException ex) { |
| rte = ex; |
| } |
| } |
| if (rte != null) { |
| throw rte; |
| } |
| } |
| |
| private boolean isEmpty() { |
| return this.msgMap == null || this.msgMap.isEmpty(); |
| } |
| void addOp(LocalRegion r,Object key, TXEntryState entry,Set otherRecipients) { |
| this.currentRegion.addOp(key, entry); |
| } |
| void send(TXLockId lockId) { |
| if (isEmpty()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("empty transaction - nothing to distribute"); |
| } |
| return; |
| } |
| Assert.assertTrue(this.txState!=null, "Send must have transaction state."); |
| this.lockId = (TXLockIdImpl)lockId; |
| updateLockMembers(); |
| |
| IdentityHashMap distMap = new IdentityHashMap(); // Map of RegionCommitList keys to Sets of receivers |
| HashSet ackReceivers = null; |
| { |
| Iterator it = this.msgMap.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry me = (Map.Entry)it.next(); |
| RegionCommitList rcl = (RegionCommitList)me.getValue(); |
| if (rcl.getNeedsAck()) { |
| if (ackReceivers == null) { |
| ackReceivers = new HashSet(); |
| } |
| ackReceivers.add(me.getKey()); |
| } |
| HashSet receivers = (HashSet)distMap.get(rcl); |
| if (receivers == null) { |
| receivers = new HashSet(); |
| distMap.put(rcl, receivers); |
| } |
| receivers.add(me.getKey()); |
| } |
| } |
| |
| // Determine if the set of VMs for any of the Regions for this TX have changed |
| HashSet recips = new HashSet(this.msgMap.keySet()); |
| DistributedRegion dr = null; |
| Iterator rI = this.txState.getRegions().iterator(); |
| CommitReplyProcessor processor = null; |
| { |
| while(rI.hasNext()) { |
| LocalRegion lr = (LocalRegion) rI.next(); |
| if (lr.getScope().isLocal()) { |
| continue; |
| } |
| dr = (DistributedRegion) lr; |
| CacheDistributionAdvisor adv = dr.getCacheDistributionAdvisor(); |
| Set newRegionMemberView = adv.adviseTX(); |
| |
| if (! recips.containsAll(newRegionMemberView)) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.TXCommitMessage_NEW_MEMBERS_FOR_REGION_0_ORIG_LIST_1_NEW_LIST_2, |
| new Object[] {dr, recips, newRegionMemberView})); |
| } |
| } |
| |
| if (ackReceivers != null) { |
| processor = new CommitReplyProcessor(this.dm, ackReceivers, msgMap); |
| if (ackReceivers.size() > 1) { |
| this.farSiders = ackReceivers; |
| } |
| processor.enableSevereAlertProcessing(); |
| } |
| { |
| Iterator it = distMap.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry me = (Map.Entry)it.next(); |
| RegionCommitList rcl = (RegionCommitList)me.getKey(); |
| HashSet recipients = (HashSet)me.getValue(); |
| // now remove from the recipients any guys that the dm no |
| // longer knows about |
| recipients.retainAll(this.dm.getDistributionManagerIds()); |
| if (!recipients.isEmpty()) { |
| if (this.txState.internalDuringIndividualSend != null) { |
| // Run in test mode, splitting out individaual recipients, |
| // so we can control who gets what |
| Iterator indivRecip = recipients.iterator(); |
| while(indivRecip.hasNext()) { |
| this.txState.internalDuringIndividualSend.run(); |
| setRecipientsSendData(Collections.singleton(indivRecip.next()), processor, rcl); |
| } |
| } else { |
| // Run in normal mode sending to mulitiple recipients in |
| // one shot |
| setRecipientsSendData(recipients, processor, rcl); |
| } |
| } |
| } |
| } |
| if (this.txState.internalAfterIndividualSend != null) { |
| this.txState.internalAfterIndividualSend.run(); |
| } |
| } |
| |
| if (processor != null) { |
| // Send the CommitProcessMessage |
| final CommitProcessMessage cpMsg; |
| if (this.lockId != null) { |
| cpMsg = new CommitProcessForLockIdMessage(this.lockId); |
| } else { |
| cpMsg = new CommitProcessForTXIdMessage(this.txIdent); |
| } |
| if (this.txState.internalDuringIndividualCommitProcess != null) { |
| // Run in test mode |
| Iterator<InternalDistributedMember> indivRecip = ackReceivers.iterator(); |
| while(indivRecip.hasNext()) { |
| this.txState.internalDuringIndividualCommitProcess.run(); |
| cpMsg.setRecipients(Collections.<InternalDistributedMember>singleton(indivRecip.next())); |
| this.dm.putOutgoing(cpMsg); |
| cpMsg.resetRecipients(); |
| } |
| } else { |
| // Run in normal mode |
| cpMsg.setRecipients(ackReceivers); |
| this.dm.putOutgoing(cpMsg); |
| } |
| |
| if (this.txState.internalAfterIndividualCommitProcess != null) { |
| // Testing callback |
| this.txState.internalAfterIndividualCommitProcess.run(); |
| } |
| |
| // for() loop removed for bug 36983 - you can't loop on waitForReplies() |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| processor.waitForCommitCompletion(); |
| this.dm.getStats().incCommitWaits(); |
| } |
| if (this.hasReliableRegions) { |
| checkDistributionReliability(distMap, processor); |
| } |
| } |
| |
| @Override |
| public boolean containsRegionContentChange() { |
| return true; |
| } |
| |
| /** |
| * Checks reliable regions and throws CommitDistributionException if any |
| * required roles may not have received the commit message. |
| * |
| * @param distMap map of RegionCommitList keys to Sets of receivers |
| * @param processor the reply processor |
| * @throws CommitDistributionException if any required roles may not have |
| * received the commit message |
| */ |
| private void checkDistributionReliability(Map distMap, CommitReplyProcessor processor) { |
| // key=RegionCommit, value=Set of recipients |
| Map regionToRecipients = new IdentityHashMap(); |
| |
| // build up the keys in regionToRecipients and add all receivers |
| for (Iterator distIter = distMap.entrySet().iterator(); distIter.hasNext();) { |
| Map.Entry me = (Map.Entry) distIter.next(); |
| RegionCommitList rcl = (RegionCommitList) me.getKey(); |
| Set recipients = (Set) me.getValue(); |
| |
| for (Iterator rclIter = rcl.iterator(); rclIter.hasNext();) { |
| RegionCommit rc = (RegionCommit) rclIter.next(); |
| // skip region if no required roles |
| if (!rc.r.requiresReliabilityCheck()) { |
| continue; |
| } |
| |
| Set recipientsForRegion = (Set) regionToRecipients.get(rc); |
| if (recipientsForRegion == null) { |
| recipientsForRegion = new HashSet(); |
| regionToRecipients.put(rc, recipientsForRegion); |
| } |
| |
| // get the receiver Set for rcl and perform addAll |
| if (recipients != null) { |
| recipientsForRegion.addAll(recipients); |
| } |
| } |
| } |
| |
| Set cacheClosedMembers = (processor == null) ? Collections.EMPTY_SET : |
| processor.getCacheClosedMembers(); |
| Set departedMembers = (processor == null) ? Collections.EMPTY_SET : |
| processor.getDepartedMembers(); |
| |
| // check reliability on each region |
| Set regionDistributionExceptions = Collections.EMPTY_SET; |
| Set failedRegionNames = Collections.EMPTY_SET; |
| for (Iterator iter = regionToRecipients.entrySet().iterator(); iter.hasNext();) { |
| Map.Entry me = (Map.Entry) iter.next(); |
| final RegionCommit rc = (RegionCommit) me.getKey(); |
| |
| final Set successfulRecipients = new HashSet(msgMap.keySet()); |
| successfulRecipients.removeAll(departedMembers); |
| |
| // remove members who destroyed that region or closed their cache |
| Set regionDestroyedMembers = (processor == null) ? Collections.EMPTY_SET : |
| processor.getRegionDestroyedMembers(rc.r.getFullPath()); |
| |
| successfulRecipients.removeAll(cacheClosedMembers); |
| successfulRecipients.removeAll(regionDestroyedMembers); |
| |
| try { |
| ReliableDistributionData rdd = new ReliableDistributionData() { |
| // public Set getSuccessfulRecipients(ReliableReplyProcessor21 processor) { |
| // return successfulRecipients; |
| // } |
| public int getOperationCount() { |
| return rc.getOperationCount(); |
| } |
| public List getOperations() { |
| return rc.getOperations(); |
| } |
| }; |
| rc.r.handleReliableDistribution(rdd, successfulRecipients); |
| } |
| catch (RegionDistributionException e) { |
| if (regionDistributionExceptions == Collections.EMPTY_SET) { |
| regionDistributionExceptions = new HashSet(); |
| failedRegionNames = new HashSet(); |
| } |
| regionDistributionExceptions.add(e); |
| failedRegionNames.add(rc.r.getFullPath()); |
| } |
| } |
| |
| if (!regionDistributionExceptions.isEmpty()) { |
| throw new CommitDistributionException(LocalizedStrings.TXCommitMessage_THESE_REGIONS_EXPERIENCED_RELIABILITY_FAILURE_DURING_DISTRIBUTION_OF_THE_OPERATION_0.toLocalizedString(failedRegionNames), regionDistributionExceptions); |
| } |
| } |
| |
| |
| /** |
| * Helper method for send |
| */ |
| private final void setRecipientsSendData(Set recipients, |
| ReplyProcessor21 processor, |
| RegionCommitList rcl) { |
| setRecipients(recipients); |
| this.regions = rcl; |
| if (rcl.getNeedsAck()) { |
| this.processorId = processor.getProcessorId(); |
| } else { |
| this.processorId = 0; |
| } |
| this.dm.getStats().incSentCommitMessages(1L); |
| this.sequenceNum++; |
| this.dm.putOutgoing(this); |
| resetRecipients(); |
| } |
| |
| @Override |
| protected void process(DistributionManager dm) { |
| this.dm = dm; |
| // Remove this node from the set of recipients |
| if (this.farSiders != null) { |
| this.farSiders.remove(dm.getId()); |
| } |
| |
| if (this.processorId != 0) { |
| TXLockService.createDTLS(); // fix bug 38843; no-op if already created |
| synchronized(this) { |
| // Handle potential origin departure |
| this.dm.addMembershipListener(this); |
| // Assume ACK mode, defer processing until we receive a |
| // CommitProcess message |
| if (logger.isDebugEnabled()) { |
| final Object key = getTrackerKey(); |
| logger.debug("Adding key:{} class{} to tracker list", key, key.getClass().getName()); |
| } |
| txTracker.add(this); |
| } |
| if (!this.dm.getDistributionManagerIds().contains(getSender())) { |
| memberDeparted(getSender(), false /*don't care*/); |
| } |
| |
| } else { |
| basicProcess(); |
| } |
| } |
| /** |
| * Adds an entry op for this tx to do on the far side |
| */ |
| void addFarSideEntryOp(RegionCommit.FarSideEntryOp entryOp) { |
| this.farSideEntryOps.add(entryOp); |
| } |
| |
| protected final void addProcessingException(Exception e) { |
| // clear all previous exceptions if e is a CacheClosedException |
| if (this.processingExceptions == Collections.EMPTY_SET || |
| e instanceof CancelException) { |
| this.processingExceptions = new HashSet(); |
| } |
| this.processingExceptions.add(e); |
| } |
| |
| |
| public void setDM(DM dm) { |
| this.dm = dm; |
| } |
| |
| public DM getDM() { |
| if (this.dm == null) { |
| GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying TXCommit"); |
| this.dm = cache.getDistributionManager(); |
| } |
| return this.dm; |
| } |
| |
| public void basicProcess() { |
| final DM dm = this.dm; |
| |
| synchronized(this) { |
| if (isProcessing()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("TXCommitMessage {} is already in process, returning", this); |
| } |
| return; |
| } else { |
| setIsProcessing(true); |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("begin processing TXCommitMessage for {}", this.txIdent); |
| } |
| final int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); // do this before CacheFactory.getInstance for bug 33471 |
| boolean forceListener = false; // this gets flipped if we need to fire tx listener |
| // it needs to default to false because we don't want to fire listeners on pr replicates |
| try { |
| TXRmtEvent txEvent = null; |
| final Cache cache = CacheFactory.getInstance(dm.getSystem()); |
| if(cache==null) { |
| addProcessingException(new CacheClosedException()); |
| // return ... this cache is closed so we can't do anything. |
| return; |
| } |
| final TransactionListener[] tls = cache.getCacheTransactionManager().getListeners(); |
| if (tls.length > 0) { |
| txEvent = new TXRmtEvent(this.txIdent, cache); |
| } |
| try { |
| // Pre-process each Region in the tx |
| try { |
| { |
| Iterator it = this.regions.iterator(); |
| while (it.hasNext()) { |
| boolean failedBeginProcess = true; |
| RegionCommit rc = (RegionCommit)it.next(); |
| try { |
| failedBeginProcess = !rc.beginProcess(dm, this.txIdent, txEvent); |
| } catch (CacheRuntimeException problem) { |
| processCacheRuntimeException(problem); |
| } finally { |
| if (failedBeginProcess) { |
| rc.r = null; // Cause related FarSideEntryOps to skip processing |
| it.remove(); // Skip endProcessing as well |
| } |
| } |
| } |
| } |
| basicProcessOps(); |
| } finally { // fix for bug 40001 |
| // post-process each Region in the tx |
| Iterator it = this.regions.iterator(); |
| while (it.hasNext()) { |
| try { |
| RegionCommit rc = (RegionCommit)it.next(); |
| rc.endProcess(); |
| if(rc.isForceFireEvent(dm)) { |
| forceListener = true; |
| } |
| } catch (CacheRuntimeException problem) { |
| processCacheRuntimeException(problem); |
| } |
| } |
| } |
| |
| /* |
| * We need to make sure that we should fire a TX afterCommit event. |
| */ |
| if(!disableListeners && (forceListener || (txEvent!=null && txEvent.getEvents().size()>0))) { |
| for (int i=0; i < tls.length; i++) { |
| try { |
| tls[i].afterCommit(txEvent); |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.error(LocalizedMessage.create(LocalizedStrings.TXCommitMessage_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t); |
| } |
| } |
| } |
| } catch(CancelException e) { |
| processCacheRuntimeException(e); |
| } finally { |
| if (txEvent != null) { |
| txEvent.freeOffHeapResources(); |
| } |
| } |
| } |
| finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| if(isAckRequired()) { |
| ack(); |
| } |
| if (!GemFireCacheImpl.getExisting("Applying TXCommitMessage").isClient()) { |
| getTracker().saveTXForClientFailover(txIdent, this); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("completed processing TXCommitMessage for {}", this.txIdent); |
| } |
| } |
| } |
| |
| public void basicProcessOps() { |
| { |
| Collections.sort(this.farSideEntryOps); |
| Iterator it = this.farSideEntryOps.iterator(); |
| while (it.hasNext()) { |
| try { |
| RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp)it.next(); |
| entryOp.process(); |
| } catch (CacheRuntimeException problem) { |
| processCacheRuntimeException(problem); |
| } catch (Exception e ) { |
| addProcessingException(e); |
| } |
| } |
| } |
| } |
| |
| private void processCacheRuntimeException(CacheRuntimeException problem) { |
| if (problem instanceof RegionDestroyedException) { // catch RegionDestroyedException |
| addProcessingException(problem); |
| } else if (problem instanceof CancelException) { // catch CacheClosedException |
| addProcessingException(problem); |
| throw problem; |
| } else { // catch CacheRuntimeException |
| addProcessingException(problem); |
| logger.error(LocalizedMessage.create( |
| LocalizedStrings.TXCommitMessage_TRANSACTION_MESSAGE_0_FROM_SENDER_1_FAILED_PROCESSING_UNKNOWN_TRANSACTION_STATE_2, |
| new Object[] { this, getSender(), problem})); |
| } |
| } |
| |
| private void ack() { |
| if (this.processorId != 0) { |
| CommitReplyException replyEx = null; |
| if (!this.processingExceptions.isEmpty()) { |
| replyEx = new CommitReplyException(LocalizedStrings.TXCommitMessage_COMMIT_OPERATION_GENERATED_ONE_OR_MORE_EXCEPTIONS_FROM_0.toLocalizedString(this.getSender()), this.processingExceptions); |
| } |
| ReplyMessage.send(getSender(), this.processorId, replyEx, this.dm); |
| } |
| } |
| |
| public int getDSFID() { |
| // on near side send old TX_COMMIT_MESSAGE if there is at least one 7.0 |
| // member in the system, otherwise send the new 7.0.1 message. |
| // 7.0.1 members will be able to deserialize either |
| //if (shouldSend701Message()) { |
| //this.shouldWriteShadowKey = true; |
| //return TX_COMMIT_MESSAGE_701; |
| return TX_COMMIT_MESSAGE; |
| /* |
| } |
| this.shouldWriteShadowKey = false; |
| return TX_COMMIT_MESSAGE; |
| */ |
| } |
| |
| /* |
| /** |
| * Do not send shadowKey to clients or when there are member(s) older than |
| * 7.0.1. |
| * |
| private boolean shouldSend701Message() { |
| if (this.clientVersion == null |
| && this.getDM().getMembersWithOlderVersion("7.0.1").isEmpty()) { |
| return true; |
| } |
| return false; |
| } |
| |
| public boolean shouldReadShadowKey() { |
| return this.shouldReadShadowKey; |
| } |
| |
| public void setShouldReadShadowKey(boolean shouldReadShadowKey) { |
| this.shouldReadShadowKey = shouldReadShadowKey; |
| } |
| |
| public boolean shouldWriteShadowKey() { |
| return this.shouldWriteShadowKey; |
| } |
| */ |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| int pId = in.readInt(); |
| |
| if(isAckRequired()) { |
| this.processorId = pId; |
| ReplyProcessor21.setMessageRPId(this.processorId); |
| } else { |
| this.processorId = -1; |
| } |
| |
| this.txIdent = TXId.createFromData(in); |
| if (in.readBoolean()) { |
| this.lockId = TXLockIdImpl.createFromData(in); |
| } |
| int totalMaxSize = in.readInt(); |
| |
| this.farsideBaseMembershipId = DataSerializer.readByteArray(in); |
| this.farsideBaseThreadId = in.readLong(); |
| this.farsideBaseSequenceId = in.readLong(); |
| |
| |
| this.needsLargeModCount = in.readBoolean(); |
| |
| int regionsSize = in.readInt(); |
| this.regions = new ArrayList(regionsSize); |
| this.farSideEntryOps = new ArrayList(totalMaxSize); |
| for (int i=0; i < regionsSize; i++) { |
| RegionCommit rc = new RegionCommit(this); |
| try { |
| rc.fromData(in); |
| } catch (CacheClosedException cce) { |
| addProcessingException(cce); |
| // return to avoid serialization error being sent in reply |
| return; |
| } |
| this.regions.add(rc); |
| } |
| |
| this.bridgeContext = ClientProxyMembershipID.readCanonicalized(in); |
| this.farSiders = DataSerializer.readHashSet(in); |
| } |
| |
| /** |
| * Return true if a distributed ack message is required. On the client side of a transaction, |
| * this returns false, while returning true elsewhere. |
| * |
| * @return requires ack message or not |
| */ |
| private boolean isAckRequired() { |
| return this.ackRequired; |
| } |
| |
| |
| /** |
| * Indicate whether an ack is required. Defaults to true. |
| * |
| * @param a true if we require an ack. false if not. false on clients. |
| */ |
| public void setAckRequired(boolean a) { |
| this.ackRequired = a; |
| if(!a) { |
| this.processorId = -1; |
| } |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| out.writeInt(this.processorId); |
| InternalDataSerializer.invokeToData(this.txIdent, out); |
| { |
| boolean hasLockId = this.lockId != null; |
| out.writeBoolean(hasLockId); |
| if (hasLockId) { |
| InternalDataSerializer.invokeToData(this.lockId, out); |
| } |
| } |
| int regionsSize = 0; |
| { |
| int totalMaxSize = 0; |
| if(this.regions!=null) { |
| regionsSize = this.regions.size(); |
| for (int i=0; i < this.regions.size(); i++) { |
| RegionCommit rc = (RegionCommit)this.regions.get(i); |
| totalMaxSize += rc.maxSize; |
| } |
| } |
| out.writeInt(totalMaxSize); |
| } |
| |
| if(this.txState!=null) { |
| DataSerializer.writeByteArray(this.txState.getBaseMembershipId(), out); |
| out.writeLong(this.txState.getBaseThreadId()); |
| out.writeLong(this.txState.getBaseSequenceId()); |
| } else { |
| DataSerializer.writeByteArray(this.farsideBaseMembershipId, out); |
| out.writeLong(this.farsideBaseThreadId); |
| out.writeLong(this.farsideBaseSequenceId); |
| } |
| |
| if(this.txState!=null) { |
| DataSerializer.writeBoolean(this.txState.needsLargeModCount(),out); |
| } else { |
| DataSerializer.writeBoolean(this.needsLargeModCount,out); |
| } |
| |
| |
| out.writeInt(regionsSize); |
| { |
| if(regionsSize>0) { |
| for (int i=0; i < this.regions.size(); i++) { |
| RegionCommit rc = (RegionCommit)this.regions.get(i); |
| rc.toData(out); |
| } |
| } |
| } |
| |
| DataSerializer.writeObject(bridgeContext,out); |
| |
| DataSerializer.writeHashSet(this.farSiders, out); |
| } |
| |
| |
| @Override |
| public String toString() { |
| StringBuffer result = new StringBuffer(256); |
| result.append("TXCommitMessage@") |
| .append(System.identityHashCode(this)) |
| .append("#") |
| .append(this.sequenceNum) |
| .append(" processorId=") |
| .append(this.processorId) |
| .append(" txId=") |
| .append(this.txIdent); |
| |
| if (this.farSiders!=null) { |
| Iterator fs = this.farSiders.iterator(); |
| result.append(" farSiders="); |
| while(fs.hasNext()) { |
| result.append(fs.next()); |
| if (fs.hasNext()) { |
| result.append(' '); |
| } |
| } |
| } else { |
| result.append(" farSiders=<null>"); |
| } |
| if (this.regions != null) { |
| Iterator it = this.regions.iterator(); |
| while (it.hasNext()) { |
| result.append(' ') |
| .append(it.next()); |
| } |
| } |
| return result.toString(); |
| } |
| |
| /** |
| * Combines a set of small TXCommitMessages that belong to one transaction |
| * into a txCommitMessage that represents an entire transaction. At commit |
| * time the txCommitMessage sent to each node can be a subset of the transaction, |
| * this method will combine those subsets into a complete message. |
| * @param msgSet |
| * @return the complete txCommitMessage |
| */ |
| public static TXCommitMessage combine(Set<TXCommitMessage> msgSet) { |
| assert msgSet != null; |
| TXCommitMessage firstPart = null; |
| Iterator<TXCommitMessage> it = msgSet.iterator(); |
| while (it.hasNext()) { |
| if (firstPart == null) { |
| firstPart = it.next(); |
| continue; |
| } |
| firstPart.combine(it.next()); |
| } |
| return firstPart; |
| } |
| |
| /** |
| * Combines the other TXCommitMessage into this message. |
| * Used to compute complete TXCommitMessage from parts. |
| * @param other |
| */ |
| public void combine(TXCommitMessage other) { |
| assert other != null; |
| Iterator it = other.regions.iterator(); |
| while (it.hasNext()) { |
| RegionCommit rc = (RegionCommit) it.next(); |
| if (!this.regions.contains(rc)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("TX: adding region commit: {} to: {}", rc, this); |
| } |
| rc.msg = this; |
| this.regions.add(rc); |
| } |
| } |
| } |
| |
| public final static class RegionCommitList extends ArrayList<RegionCommit> { |
| private static final long serialVersionUID = -8910813949027683641L; |
| private transient boolean needsAck = false; |
| private transient RegionCommit trimRC = null; |
| private transient RegionCommitList trimChild = null; |
| |
| public RegionCommitList() { |
| super(); |
| } |
| |
| public RegionCommitList(RegionCommitList c) { |
| super(c); |
| } |
| |
| public boolean getNeedsAck() { |
| return this.needsAck; |
| } |
| @Override // GemStoneAddition |
| public boolean add(RegionCommit o) { |
| RegionCommit rc = (RegionCommit)o; |
| rc.incRefCount(); |
| if (!this.needsAck && rc.needsAck()) { |
| this.needsAck = true; |
| } |
| return super.add(o); |
| } |
| /** |
| * Creates a new list, if needed, that contains all the elements |
| * of the specified old list except the last one if it is 'rc'. |
| * Also recomputes needsAck field. |
| */ |
| public RegionCommitList trim(RegionCommit rc) { |
| if (get(size()-1) != rc) { |
| // no need to trim because it does not contain rc |
| return this; |
| } |
| if (this.trimRC == rc) { |
| return this.trimChild; |
| } |
| RegionCommitList result = new RegionCommitList(this); |
| this.trimRC = rc; |
| this.trimChild = result; |
| result.remove(result.size()-1); |
| { |
| Iterator it = result.iterator(); |
| while (it.hasNext()) { |
| RegionCommit itrc = (RegionCommit)it.next(); |
| itrc.incRefCount(); |
| if (itrc.needsAck()) { |
| result.needsAck = true; |
| } |
| } |
| } |
| return result; |
| } |
| @Override |
| public String toString() { |
| StringBuffer result = new StringBuffer(256); |
| result.append('@') |
| .append(System.identityHashCode(this)) |
| .append(' ') |
| .append(super.toString()); |
| return result.toString(); |
| } |
| } |
| |
| public static class RegionCommit { |
| /** |
| * The region that this commit represents. |
| * Valid on both nearside and farside. |
| */ |
| protected transient LocalRegion r; |
| /** |
| * Valid only on farside. |
| */ |
| private String regionPath; |
| private String parentRegionPath; |
| /** |
| * The message this region commit is a part of. |
| * Valid on both farside and nearside. |
| */ |
| private transient TXCommitMessage msg; |
| /** |
| * Number of RegionCommitList instances that have this RegionCommit in them |
| * Valid only on nearside. |
| */ |
| private transient int refCount = 0; |
| /** |
| * Valid only on nearside. |
| */ |
| private transient HeapDataOutputStream preserializedBuffer = null; |
| /** |
| * Upperbound on the number of operations this region could possibly have |
| * Valid only on nearside. |
| */ |
| transient int maxSize; |
| /** |
| * A list of Object; each one is the entry key for a distributed operation |
| * done by this transaction. |
| * The list must be kept in sync with opKeys. |
| * Valid only on nearside. |
| */ |
| private transient ArrayList opKeys; |
| /** |
| * A list of TXEntryState; each one is the entry info for a distributed operation |
| * done by this transaction. |
| * The list must be kept in sync with opKeys. |
| * Valid only on nearside. |
| */ |
| private transient ArrayList opEntries; |
| |
| private transient VersionSource memberId; |
| |
| /** |
| * The persistent ids of the peers for this region. |
| * Used to mark peers as offline if they do not apply the commit |
| * due to a cache close. |
| */ |
| public Map<InternalDistributedMember, PersistentMemberID> persistentIds; |
| |
| /** |
| * Used on nearside |
| */ |
| RegionCommit(TXCommitMessage msg, LocalRegion r, int maxSize) { |
| this.msg = msg; |
| this.r = r; |
| this.maxSize = maxSize; |
| } |
| |
| /** |
| * Used on farside who inits r later and never sets maxSize |
| */ |
| RegionCommit(TXCommitMessage msg) { |
| this.msg = msg; |
| } |
| |
| public void incRefCount() { |
| this.refCount++; |
| } |
| |
| /** |
| * Valid on farside after beginProcess. |
| * Used to remember what to do at region cleanup time |
| */ |
| private boolean needsUnlock; |
| /** |
| * Valid on farside after beginProcess. |
| * Used to remember what to do at region cleanup time |
| */ |
| private boolean needsLRUEnd; |
| /** |
| * Valid on farside after beginProcess |
| * This is the txEvent that should be used by this RegionCommit |
| */ |
| private TXRmtEvent txEvent; |
| |
| /** |
| * Called to setup a region commit so its entryOps can be processed |
| * @return true if region should be processed; false if it can be ignored |
| * @throws CacheClosedException if the cache has been closed |
| */ |
| boolean beginProcess(DM dm, TransactionId txIdent, TXRmtEvent txEvent) |
| throws CacheClosedException |
| { |
| if (logger.isDebugEnabled()) { |
| logger.debug("begin processing TXCommitMessage {} for region {}", txIdent, this.regionPath); |
| } |
| try { |
| if (!hookupRegion(dm)) { |
| return false; |
| } |
| if (msg.isAckRequired() && (this.r == null || !this.r.getScope().isDistributed())) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Received unneeded commit data for region {}", this.regionPath); |
| } |
| this.msg.addProcessingException(new RegionDestroyedException(LocalizedStrings.TXCommitMessage_REGION_NOT_FOUND.toLocalizedString(), this.regionPath)); |
| this.r = null; |
| return false; |
| } |
| this.needsUnlock = this.r.lockGII(); |
| this.r.txLRUStart(); |
| this.needsLRUEnd = true; |
| if (this.r.isInitialized()) { |
| // We don't want the txEvent to know anything about our regions |
| // that are still doing gii. |
| this.txEvent = txEvent; |
| } |
| } |
| catch (RegionDestroyedException e) { |
| this.msg.addProcessingException(e); |
| // Region destroyed: Update cancelled |
| if (logger.isDebugEnabled()) { |
| logger.debug("Received unneeded commit data for region {} because the region was destroyed.", this.regionPath, e); |
| } |
| this.r = null; |
| } |
| return this.r != null; |
| } |
| private boolean hookupRegion(DM dm) { |
| this.r = LocalRegion.getRegionFromPath(dm.getSystem(), this.regionPath); |
| if(this.r==null && this.parentRegionPath!=null) { |
| this.r = LocalRegion.getRegionFromPath(dm.getSystem(), this.parentRegionPath); |
| this.regionPath = this.parentRegionPath; |
| } |
| if (this.r == null && dm.getSystem().isLoner()) { |
| // If there are additional regions that the server enlisted in the tx, |
| // which the client does not have, the client can just ignore the region |
| // see bug 51922 |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Called when processing is complete; only needs to be called if beginProcess |
| * returned true. |
| */ |
| void endProcess() { |
| if (this.r != null) { |
| try { |
| if (this.needsLRUEnd) { |
| this.needsLRUEnd = false; |
| this.r.txLRUEnd(); |
| } |
| } finally { |
| if (this.needsUnlock) { |
| this.needsUnlock = false; |
| this.r.unlockGII(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the eventId to use for the give farside entry op. |
| * @since 5.7 |
| */ |
| private EventID getEventId(FarSideEntryOp entryOp) { |
| return this.msg.getEventId(entryOp.eventOffset); |
| } |
| |
| |
| /** |
| * Apply a single tx entry op on the far side |
| */ |
| @SuppressWarnings("synthetic-access") |
| protected void txApplyEntryOp(FarSideEntryOp entryOp) |
| { |
| if (this.r == null) { |
| return; |
| } |
| EventID eventID = getEventId(entryOp); |
| boolean isDuplicate = this.r.hasSeenEvent(eventID); |
| boolean callbacksOnly = (this.r.getDataPolicy() == DataPolicy.PARTITION) |
| || isDuplicate; |
| if (this.r instanceof PartitionedRegion) { |
| /* |
| * This happens when we don't have the bucket and are getting adjunct notification |
| */ |
| EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey); |
| try { |
| if(entryOp.filterRoutingInfo!=null) { |
| eei.setLocalFilterInfo(entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId())); |
| } |
| if (isDuplicate) { |
| eei.setPossibleDuplicate(true); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("invoking transactional callbacks for {} key={} needsUnlock={} event={}", entryOp.op, entryOp.key, this.needsUnlock, eei); |
| } |
| // we reach this spot because the event is either delivered to this member |
| // as an "adjunct" message or because the bucket was being created when |
| // the message was sent and already reflects the change caused by this event. |
| // In the latter case we need to invoke listeners |
| final boolean skipListeners = !isDuplicate; |
| eei.invokeCallbacks(this.r, skipListeners, true); |
| } finally { |
| eei.release(); |
| } |
| return; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("applying transactional {} key={} needsUnlock={} eventId {} with routing {}", |
| entryOp.op, entryOp.key, this.needsUnlock, getEventId(entryOp), entryOp.filterRoutingInfo); |
| } |
| if (entryOp.versionTag != null) { |
| entryOp.versionTag.replaceNullIDs(this.msg.getSender()); |
| } |
| if (entryOp.op.isDestroy()) { |
| this.r.txApplyDestroy(entryOp.key, |
| this.msg.txIdent, |
| this.txEvent, |
| this.needsUnlock, |
| entryOp.op, |
| getEventId(entryOp), |
| entryOp.callbackArg, |
| null /* fire inline, no pending callbacks */, |
| entryOp.filterRoutingInfo, |
| this.msg.bridgeContext, |
| false /* origin remote */, |
| null/*txEntryState*/, |
| entryOp.versionTag, |
| entryOp.tailKey); |
| } else if (entryOp.op.isInvalidate()) { |
| this.r.txApplyInvalidate(entryOp.key, |
| Token.INVALID, |
| entryOp.didDestroy, |
| this.msg.txIdent, |
| this.txEvent, |
| false /*localOp*/, |
| getEventId(entryOp), |
| entryOp.callbackArg, |
| null /* fire inline, no pending callbacks */, |
| entryOp.filterRoutingInfo, |
| this.msg.bridgeContext, |
| null/*txEntryState*/, |
| entryOp.versionTag, |
| entryOp.tailKey); |
| } else { |
| this.r.txApplyPut(entryOp.op, |
| entryOp.key, |
| entryOp.value, |
| entryOp.didDestroy, |
| this.msg.txIdent, |
| this.txEvent, |
| getEventId(entryOp), |
| entryOp.callbackArg, |
| null /* fire inline, no pending callbacks */, |
| entryOp.filterRoutingInfo, |
| this.msg.bridgeContext, |
| null/*txEntryState*/, |
| entryOp.versionTag, |
| entryOp.tailKey); |
| } |
| } |
| |
| boolean isEmpty() { |
| return this.opKeys == null; |
| } |
| /** |
| * Returns the number of operations this region commit will do |
| * @since 5.0 |
| */ |
| int getOperationCount() { |
| int result = 0; |
| if (!isEmpty()) { |
| result = this.opKeys.size(); |
| } |
| return result; |
| } |
| boolean needsAck() { |
| return this.r.getScope().isDistributedAck(); |
| } |
| void addOp(Object key, TXEntryState entry) { |
| if (this.opKeys == null) { |
| this.opKeys = new ArrayList(this.maxSize); |
| this.opEntries = new ArrayList(this.maxSize); |
| } |
| this.opKeys.add(key); |
| this.opEntries.add(entry); |
| } |
| |
| |
| public boolean isForceFireEvent(DM dm) { |
| LocalRegion r = LocalRegion.getRegionFromPath(dm.getSystem(), this.regionPath); |
| if (r instanceof PartitionedRegion |
| || (r != null && r.isUsedForPartitionedRegionBucket())) { |
| return false; |
| } |
| return true; |
| } |
| |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| this.regionPath = DataSerializer.readString(in); |
| this.parentRegionPath = DataSerializer.readString(in); |
| |
| int size = in.readInt(); |
| if (size > 0) { |
| this.opKeys = new ArrayList(size); |
| this.opEntries = new ArrayList(size); |
| final boolean largeModCount = in.readBoolean(); |
| this.memberId = DataSerializer.readObject(in); |
| for (int i=0; i < size; i++) { |
| FarSideEntryOp entryOp = new FarSideEntryOp(); |
| // shadowkey is not being sent to clients |
| entryOp.fromData(in, largeModCount, !this.msg.getDM().isLoner()); |
| if (entryOp.versionTag != null && this.memberId != null) { |
| entryOp.versionTag.setMemberID(this.memberId); |
| } |
| this.msg.addFarSideEntryOp(entryOp); |
| this.opKeys.add(entryOp.key); |
| this.opEntries.add(entryOp); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuffer result = new StringBuffer(64); |
| if (this.regionPath != null) { |
| result.append(this.regionPath); |
| } else { |
| result.append(this.r.getFullPath()); |
| } |
| if (this.refCount > 0) { |
| result.append(" refCount=") |
| .append(this.refCount); |
| } |
| return result.toString(); |
| } |
| |
| /** |
| * Returns a list of QueuedOperation instances for reliable distribution |
| * @since 5.0 |
| */ |
| List getOperations() { |
| QueuedOperation[] ops = new QueuedOperation[getOperationCount()]; |
| for (int i=0; i < ops.length; i++) { |
| TXEntryState es = (TXEntryState)this.opEntries.get(i); |
| ops[i] = es.toFarSideQueuedOp(this.opKeys.get(i)); |
| } |
| return Arrays.asList(ops); |
| } |
| |
| private void basicToData(DataOutput out) throws IOException { |
| if(this.r!=null) { |
| DataSerializer.writeString(this.r.getFullPath(), out); |
| if(this.r instanceof BucketRegion) { |
| DataSerializer.writeString(((BucketRegion)this.r).getPartitionedRegion().getFullPath(), out); |
| } else { |
| DataSerializer.writeString(null, out); |
| } |
| } else { |
| DataSerializer.writeString(this.regionPath, out); |
| DataSerializer.writeString(this.parentRegionPath,out); |
| } |
| |
| if (isEmpty() || this.opKeys.size()==0) { |
| out.writeInt(0); |
| } else { |
| int size = this.opKeys.size(); |
| out.writeInt(size); |
| |
| final boolean largeModCount; |
| if(this.msg.txState!=null) { |
| largeModCount = this.msg.txState.needsLargeModCount(); |
| } else { |
| largeModCount = this.msg.needsLargeModCount; |
| } |
| out.writeBoolean(largeModCount); |
| |
| final boolean sendVersionTags = this.msg.clientVersion == null |
| || Version.GFE_70.compareTo(this.msg.clientVersion) <= 0; |
| if (sendVersionTags) { |
| VersionSource member = this.memberId; |
| if (member == null) { |
| if (this.r == null) { |
| Assert.assertTrue(this.msg.txState == null); |
| } else { |
| member = this.r.getVersionMember(); |
| } |
| } |
| DataSerializer.writeObject(member, out); |
| } |
| for (int i=0; i < size; i++) { |
| DataSerializer.writeObject(this.opKeys.get(i), out); |
| if(this.msg.txState!=null) { |
| /* we are still on tx node and have the entry state */ |
| ((TXEntryState)this.opEntries.get(i)).toFarSideData(out, largeModCount, sendVersionTags, this.msg.clientVersion == null); |
| } else { |
| ((FarSideEntryOp)this.opEntries.get(i)).toData(out,largeModCount, sendVersionTags, this.msg.clientVersion == null); |
| } |
| } |
| } |
| } |
| |
| |
| public void toData(DataOutput out) throws IOException { |
| if (this.preserializedBuffer != null) { |
| this.preserializedBuffer.rewind(); |
| this.preserializedBuffer.sendTo(out); |
| } else if (this.refCount > 1) { |
| Version v = InternalDataSerializer.getVersionForDataStream(out); |
| HeapDataOutputStream hdos = new HeapDataOutputStream(1024, v); |
| basicToData(hdos); |
| this.preserializedBuffer = hdos; |
| this.preserializedBuffer.sendTo(out); |
| } else { |
| basicToData(out); |
| } |
| } |
| /** |
| * Holds data that describes a tx entry op on the far side. |
| * @since 5.0 |
| */ |
| public class FarSideEntryOp implements Comparable { |
| public Operation op; |
| public int modSerialNum; |
| public int eventOffset; |
| public Object key; |
| public Object value; |
| public boolean didDestroy; |
| public Object callbackArg; |
| private FilterRoutingInfo filterRoutingInfo; |
| private VersionTag versionTag; |
| private long tailKey; |
| /** |
| * Create a new representation of a tx entry op on the far side. |
| * All init will be done by a call to fromData |
| */ |
| public FarSideEntryOp() { |
| } |
| /** |
| * Creates and returns a new instance of a tx entry op on the far side. |
| * The "toData" that this should match is {@link TXEntryState#toFarSideData}. |
| * @param in the data input that is used to read the data for this entry op |
| * @param largeModCount true if the mod count is a int instead of a byte. |
| * @param readShadowKey true if a long shadowKey should be read |
| * @throws ClassNotFoundException |
| */ |
| public void fromData(DataInput in, boolean largeModCount, boolean readShadowKey) |
| throws IOException, ClassNotFoundException |
| { |
| this.key = DataSerializer.readObject(in); |
| this.op = Operation.fromOrdinal(in.readByte()); |
| if (largeModCount) { |
| this.modSerialNum = in.readInt(); |
| } else { |
| this.modSerialNum = in.readByte(); |
| } |
| this.callbackArg = DataSerializer.readObject(in); |
| this.filterRoutingInfo = DataSerializer.readObject(in); |
| this.versionTag = DataSerializer.readObject(in); |
| if (readShadowKey) { |
| this.tailKey = in.readLong(); |
| } |
| this.eventOffset = in.readInt(); |
| if (!this.op.isDestroy()) { |
| this.didDestroy = in.readBoolean(); |
| if (!this.op.isInvalidate()) { |
| boolean isToken = in.readBoolean(); |
| if (isToken) { |
| this.value = DataSerializer.readObject(in); |
| } else { |
| this.value = CachedDeserializableFactory.create(DataSerializer.readByteArray(in)); |
| } |
| } |
| } |
| } |
| |
| public void toData(DataOutput out, boolean largeModCount, boolean sendVersionTag, boolean sendShadowKey) |
| throws IOException |
| { |
| // DataSerializer.writeObject(this.key,out); |
| /* Don't serialize key because caller did that already */ |
| |
| out.writeByte(this.op.ordinal); |
| if(largeModCount) { |
| out.writeInt(this.modSerialNum); |
| } else { |
| out.writeByte(this.modSerialNum); |
| } |
| DataSerializer.writeObject(this.callbackArg,out); |
| DataSerializer.writeObject(this.filterRoutingInfo,out); |
| if (sendVersionTag) { |
| DataSerializer.writeObject(this.versionTag, out); |
| } |
| if (sendShadowKey) { |
| out.writeLong(this.tailKey); |
| } |
| out.writeInt(this.eventOffset); |
| if(!this.op.isDestroy()) { |
| out.writeBoolean(this.didDestroy); |
| if(!this.op.isInvalidate()) { |
| boolean sendObject = Token.isInvalidOrRemoved(this.value); |
| sendObject = sendObject || this.value instanceof byte[]; |
| out.writeBoolean(sendObject); |
| if (sendObject) { |
| DataSerializer.writeObject(this.value, out); |
| } else { |
| DataSerializer.writeObjectAsByteArray(this.value, out); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * Performs this entryOp on the farside of a tx commit. |
| */ |
| public void process() { |
| txApplyEntryOp(this); |
| } |
| |
| public RegionCommit getRegionCommit() { |
| return RegionCommit.this; |
| } |
| /** |
| * Returns the value to use to sort us |
| */ |
| private int getSortValue() { |
| return this.modSerialNum; |
| } |
| public int compareTo(Object o) { |
| FarSideEntryOp other = (FarSideEntryOp)o; |
| return getSortValue() - other.getSortValue(); |
| } |
| @Override |
| public boolean equals(Object o) { |
| if (o == null || !(o instanceof FarSideEntryOp)) return false; |
| return compareTo(o) == 0; |
| } |
| @Override |
| public int hashCode() { |
| return getSortValue(); |
| } |
| } |
| } |
| |
| final Object getTrackerKey() { |
| if (this.lockId != null) { |
| return this.lockId; |
| } else { |
| return this.txIdent; |
| } |
| } |
| |
| /** |
| * Used to prevent processing of the message if |
| * we have reported to other FarSiders that |
| * we did not received the CommitProcessMessage |
| */ |
| final boolean dontProcess() { |
| return this.dontProcess; |
| } |
| |
| /** |
| * Indicate that this message should not be processed |
| * if we receive CommitProcessMessage (late) |
| */ |
| final void setDontProcess() { |
| this.dontProcess = true; |
| } |
| |
| final boolean isProcessing() { |
| return this.isProcessing; |
| } |
| |
| private final void setIsProcessing(boolean isProcessing) { |
| this.isProcessing = isProcessing; |
| } |
| |
| final boolean wasProcessed() { |
| return this.wasProcessed; |
| } |
| |
| final void setProcessed(boolean wasProcessed) { |
| this.wasProcessed = wasProcessed; |
| } |
| |
| /********************* Region Commit Process Messages ***************************************/ |
| |
| /** |
| * The CommitProcessForLockIDMessaage is sent by the Distributed |
| * ACK TX origin to the recipients (aka FarSiders) to indicate that |
| * a previously received RegionCommit that contained a lockId should commence |
| * processing. |
| */ |
| static final public class CommitProcessForLockIdMessage extends CommitProcessMessage { |
| private TXLockId lockId; |
| |
| public CommitProcessForLockIdMessage() { |
| // Zero arg constructor for DataSerializer |
| } |
| public CommitProcessForLockIdMessage(TXLockId lockId) { |
| this.lockId = lockId; |
| Assert.assertTrue(this.lockId != null, "CommitProcessForLockIdMessage must have a non-null lockid!"); |
| } |
| @Override |
| protected void process(DistributionManager dm) { |
| final TXCommitMessage mess = waitForMessage(this.lockId, dm); |
| Assert.assertTrue(mess != null, "Commit data for TXLockId: " + this.lockId + " not found"); |
| basicProcess(mess, dm); |
| } |
| public int getDSFID() { |
| return COMMIT_PROCESS_FOR_LOCKID_MESSAGE; |
| } |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| InternalDataSerializer.invokeToData(this.lockId, out); |
| } |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| this.lockId = TXLockIdImpl.createFromData(in); |
| Assert.assertTrue(this.lockId != null, "CommitProcessForLockIdMessage must have a non-null lockid!"); |
| } |
| @Override |
| public String toString() { |
| StringBuffer result = new StringBuffer(128); |
| result.append("CommitProcessForLockIdMessage@") |
| .append(System.identityHashCode(this)) |
| .append(" lockId=") |
| .append(this.lockId); |
| return result.toString(); |
| } |
| } |
| |
| /** |
| * The CommitProcessForTXIdMessaage is sent by the Distributed ACK |
| * TX origin to the recipients (aka FarSiders) to indicate that a |
| * previously received RegionCommit that contained a TXId should |
| * commence processing. RegionCommit messages that contain a TXId |
| * (and no TXLockId) are typically sent if all the TX changes are a |
| * result of load/netsearch/netload values (thus no lockid) |
| */ |
| static final public class CommitProcessForTXIdMessage extends CommitProcessMessage { |
| private TXId txId; |
| |
| public CommitProcessForTXIdMessage() { |
| // Zero arg constructor for DataSerializer |
| } |
| public CommitProcessForTXIdMessage(TXId txId) { |
| this.txId = txId; |
| Assert.assertTrue(this.txId != null, "CommitProcessMessageForTXId must have a non-null txid!"); |
| } |
| @Override |
| protected void process(DistributionManager dm) { |
| final TXCommitMessage mess = waitForMessage(this.txId, dm); |
| Assert.assertTrue(mess != null, "Commit data for TXId: " + this.txId + " not found"); |
| basicProcess(mess, dm); |
| } |
| public int getDSFID() { |
| return COMMIT_PROCESS_FOR_TXID_MESSAGE; |
| } |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| InternalDataSerializer.invokeToData(this.txId, out); |
| } |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| this.txId = TXId.createFromData(in); |
| Assert.assertTrue(this.txId != null, "CommitProcessMessageForTXId must have a non-null txid!"); |
| } |
| @Override |
| public String toString() { |
| StringBuffer result = new StringBuffer(128); |
| result.append("CommitProcessForTXIdMessage@") |
| .append(System.identityHashCode(this)) |
| .append(" txId=") |
| .append(this.txId); |
| return result.toString(); |
| } |
| } |
| static abstract public class CommitProcessMessage extends PooledDistributionMessage { |
| protected final void basicProcess(final TXCommitMessage mess, final DistributionManager dm) { |
| dm.removeMembershipListener(mess); |
| synchronized(mess) { |
| if (mess.dontProcess()) { |
| return; |
| } |
| } |
| try { |
| mess.basicProcess(); |
| } finally { |
| txTracker.processed(mess); |
| } |
| } |
| } |
| |
| /********************* Commit Process Query Message ***************************************/ |
| |
| /** |
| * The CommitProcessQueryMessage is used to attempt to recover - in |
| * the Distributed ACK TXs - when the origin of the CommitProcess |
| * messages departed from the distributed system. The sender of |
| * this message is attempting to query other potential fellow |
| * FarSiders (aka recipients) who may have received the |
| * CommitProcess message. |
| * |
| * Since the occurance of this message will be rare (hopefully), it |
| * was decided to be general about the the tracker key - opting not |
| * to have specific messages for each type like |
| * CommitProcessFor<Lock/TX>Id - and take the performance penalty |
| * of an extra call to DataSerializer |
| * |
| */ |
| static final public class CommitProcessQueryMessage extends PooledDistributionMessage { |
| private Object trackerKey; // Either a TXLockId or a TXId |
| private int processorId; |
| |
| public CommitProcessQueryMessage() { |
| // Zero arg constructor for DataSerializer |
| } |
| |
| public CommitProcessQueryMessage(Object trackerKey, int processorId) { |
| this.trackerKey = trackerKey; |
| this.processorId = processorId; |
| } |
| |
| @Override |
| protected void process(DistributionManager dm) { |
| final boolean processMsgReceived = txTracker.commitProcessReceived(this.trackerKey, dm); |
| if (!processMsgReceived) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CommitProcessQuery did not find {} in the history", this.trackerKey); |
| } |
| } |
| |
| // Reply to the fellow FarSider as to whether the |
| // CommitProcess message was received |
| CommitProcessQueryReplyMessage resp = |
| new CommitProcessQueryReplyMessage(processMsgReceived); |
| resp.setProcessorId(this.processorId); |
| resp.setRecipient(this.getSender()); |
| dm.putOutgoing(resp); |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| DataSerializer.writeObject(this.trackerKey, out); |
| out.writeInt(this.processorId); |
| } |
| |
| public int getDSFID() { |
| return COMMIT_PROCESS_QUERY_MESSAGE; |
| } |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| this.trackerKey = DataSerializer.readObject(in); |
| this.processorId = in.readInt(); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuffer result = new StringBuffer(128); |
| result.append("CommitProcessQueryMessage@") |
| .append(System.identityHashCode(this)) |
| .append(" trackerKeyClass=") |
| .append(this.trackerKey.getClass().getName()) |
| .append(" trackerKey=") |
| .append(this.trackerKey) |
| .append(" processorId=") |
| .append(this.processorId); |
| return result.toString(); |
| } |
| } |
| |
| /********************* Commit Process Query Response Message **********************************/ |
| static final public class CommitProcessQueryReplyMessage extends ReplyMessage { |
| private boolean wasReceived; |
| |
| public CommitProcessQueryReplyMessage(boolean wasReceived) { |
| this.wasReceived = wasReceived; |
| } |
| public CommitProcessQueryReplyMessage() { |
| // zero arg constructor for DataSerializer |
| } |
| |
| public boolean wasReceived() { |
| return wasReceived; |
| } |
| |
| @Override |
| public int getDSFID() { |
| return COMMIT_PROCESS_QUERY_REPLY_MESSAGE; |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| this.wasReceived = in.readBoolean(); |
| } |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| out.writeBoolean(this.wasReceived); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuffer result = new StringBuffer(128); |
| result.append("CommitProcessQueryReplyMessage@") |
| .append(System.identityHashCode(this)) |
| .append(" wasReceived=") |
| .append(this.wasReceived) |
| .append(" processorId=") |
| .append(this.processorId) |
| .append(" from ") |
| .append(this.getSender()); |
| return result.toString(); |
| } |
| } |
| |
| /********************* Commit Process Query Response Processor *********************************/ |
| static final public class CommitProcessQueryReplyProcessor extends ReplyProcessor21 { |
| public boolean receivedOnePositive; |
| CommitProcessQueryReplyProcessor(DM dm, Set members) { |
| super(dm, members); |
| this.receivedOnePositive = false; |
| } |
| |
| @Override |
| public void process(DistributionMessage msg) { |
| CommitProcessQueryReplyMessage ccMess = (CommitProcessQueryReplyMessage) msg; |
| if (ccMess.wasReceived()) { |
| this.receivedOnePositive = true; |
| } |
| super.process(msg); |
| } |
| |
| @Override |
| final protected boolean canStopWaiting() { |
| return this.receivedOnePositive; |
| } |
| |
| final public boolean receivedACommitProcessMessage() { |
| return this.receivedOnePositive; |
| } |
| } |
| |
| /********************* MembershipListener Implementation ***************************************/ |
| public void memberJoined(InternalDistributedMember id) { |
| } |
| |
| public void memberSuspect(InternalDistributedMember id, |
| InternalDistributedMember whoSuspected) { |
| } |
| |
| public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) { |
| } |
| |
| public void memberDeparted(final InternalDistributedMember id, boolean crashed) { |
| if ( ! getSender().equals(id)) { |
| return; |
| } |
| this.dm.removeMembershipListener(this); |
| |
| ThreadGroup group = |
| LoggingThreadGroup.createThreadGroup("TXCommitMessage Threads", logger); |
| synchronized (this) { |
| if (isProcessing() || this.departureNoticed) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Member departed: Commit data is already being processed for lockid: {}", lockId); |
| } |
| return; |
| } |
| this.departureNoticed = true; |
| } |
| |
| // Send message to fellow FarSiders (aka recipients), if any, to |
| // determine if any one of them have received a CommitProcessMessage |
| if (this.farSiders != null && ! this.farSiders.isEmpty()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Member departed: {} sending query for CommitProcess message to other recipients.", id); |
| } |
| |
| // Create a new thread, send the CommitProcessQuery, wait for a response and potentially process |
| Thread fellowFarSidersQuery = new Thread(group, "CommitProcessQuery Thread") { |
| // Should I use a thread pool?, Darrel suggests look in DM somewhere or introduce a zero sized thread pool |
| @Override |
| public void run() { |
| final TXCommitMessage mess = TXCommitMessage.this; |
| Object trackerKey = mess.getTrackerKey(); |
| DistributedMember member = getMemberFromTrackerKey(trackerKey); |
| if (!mess.getSender().equals(member)) { |
| /* Do not send a CommitProcessQueryMessage when the sender of CommitMessage |
| * is not the member in the tracker key. (If this happens we are the redundant |
| * node for PR, and the primary just crashed). |
| */ |
| txTracker.removeMessage(mess); |
| return; |
| } |
| CommitProcessQueryReplyProcessor replProc = |
| new CommitProcessQueryReplyProcessor(mess.dm, mess.farSiders); |
| CommitProcessQueryMessage query = new CommitProcessQueryMessage(mess.getTrackerKey(), replProc.getProcessorId()); |
| query.setRecipients(mess.farSiders); |
| mess.dm.putOutgoing(query); |
| // Wait for any one positive response or all negative responses. |
| // (while() loop removed for bug 36983 - you can't loop on waitForReplies() |
| TXCommitMessage.this.dm.getCancelCriterion().checkCancelInProgress(null); |
| try { |
| replProc.waitForRepliesUninterruptibly(); |
| } catch (ReplyException e) { |
| e.handleAsUnexpected(); |
| } |
| if (replProc.receivedACommitProcessMessage()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Transaction associated with lockID: {} from orign {} is processing due to a received \"commit process\" message", |
| mess.lockId, id); |
| } |
| |
| try { |
| // Set processor to zero to avoid the ack to the now departed origin |
| mess.processorId = 0; |
| mess.basicProcess(); |
| } finally { |
| txTracker.processed(mess); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Transaction associated with lockID: {} from origin {} ignored. No other recipients received \"commit process\" message", |
| mess.lockId, id); |
| } |
| txTracker.removeMessage(mess); |
| } |
| |
| } |
| |
| private DistributedMember getMemberFromTrackerKey(Object trackerKey) { |
| if (trackerKey instanceof TXId) { |
| TXId id1 = (TXId)trackerKey; |
| return id1.getMemberId(); |
| } else if (trackerKey instanceof TXLockId) { |
| TXLockId id2 = (TXLockId)trackerKey; |
| return id2.getMemberId(); |
| } |
| return null; |
| } |
| }; |
| fellowFarSidersQuery.setDaemon(true); |
| fellowFarSidersQuery.start(); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Member departed: {}. Processing commit data.", getSender()); |
| } |
| |
| // Optimimal case where we are the only FarSider, assume we |
| // will never get the CommitProcess message, but it |
| // doesn't matter since we can commit anyway. |
| // Start a new thread to process the commit |
| Thread originDepartedCommit = new Thread(group, "Origin Departed Commit") { |
| @Override |
| public void run() { |
| final TXCommitMessage mess = TXCommitMessage.this; |
| try { |
| // Set processor to zero to avoid the ack to the now departed origin |
| mess.processorId = 0; |
| mess.basicProcess(); |
| } finally { |
| txTracker.processed(mess); |
| } |
| } |
| }; |
| originDepartedCommit.setDaemon(true); |
| originDepartedCommit.start(); |
| } |
| } |
| |
| void setUpdateLockMembers() { |
| this.lockNeedsUpdate = true; |
| } |
| |
| /** |
| * Intended to be called after TXState.applyChanges when the |
| * potential for a different set of TX members has been determined |
| * and it is safe to ignore any new members because the changes have |
| * been applied to committed state. This was added as the solution |
| * to bug 32999 and the recovery when the TXLock Lessor (the sending |
| * VM) crashes/departs before or while sending the TXCommitMessage. |
| * @see TXState#commit() |
| * @see com.gemstone.gemfire.internal.cache.locks.TXLockBatch#getBatchId() |
| */ |
| private void updateLockMembers() { |
| if (this.lockNeedsUpdate && this.lockId != null) { |
| TXLockService.createDTLS().updateParticipants(this.lockId, this.msgMap.keySet()); |
| } |
| } |
| |
| // /** Custom subclass that keeps all ReplyExceptions */ |
| // private class ReliableCommitReplyProcessor extends ReliableReplyProcessor21 { |
| // |
| // /** Set of members that threw CacheClosedExceptions */ |
| // private Set cacheExceptions = new HashSet(); |
| // /** key=region path, value=Set of members */ |
| // private Map regionExceptions = new HashMap(); |
| // |
| // public ReliableCommitReplyProcessor(DM dm, |
| // Set initMembers) { |
| // super(dm, initMembers); |
| // } |
| // protected synchronized void processException(DistributionMessage msg, |
| // ReplyException re) { |
| // // only interested in CommitReplyException |
| // if (re instanceof CommitReplyException) { |
| // CommitReplyException cre = (CommitReplyException) re; |
| // Set exceptions = cre.getExceptions(); |
| // for (Iterator iter = exceptions.iterator(); iter.hasNext();) { |
| // Exception ex = (Exception) iter.next(); |
| // if (ex instanceof CacheClosedException) { |
| // cacheExceptions.add(msg.getSender()); |
| // } |
| // else if (ex instanceof RegionDestroyedException) { |
| // String r = ((RegionDestroyedException)ex).getRegionFullPath(); |
| // Set members = (Set) regionExceptions.get(r); |
| // if (members == null) { |
| // members = new HashSet(); |
| // regionExceptions.put(r, members); |
| // } |
| // members.add(msg.getSender()); |
| // } |
| // } |
| // } |
| // else { |
| // // allow superclass to handle all other exceptions |
| // super.processException(msg, re); |
| // } |
| // } |
| // // these two accessors should be called after wait for replies completes |
| // protected Set getCacheClosedMembers() { |
| // return this.cacheExceptions; |
| // } |
| // protected Set getRegionDestroyedMembers(String regionFullPath) { |
| // Set members = (Set) this.regionExceptions.get(regionFullPath); |
| // if (members == null) { |
| // members = Collections.EMPTY_SET; |
| // } |
| // return members; |
| // } |
| // } |
| |
| /** |
| * Reply processor which collects all CommitReplyExceptions and emits |
| * a detailed failure exception if problems occur |
| * @author Mitch Thomas |
| * @since 5.7 |
| */ |
| private class CommitReplyProcessor extends ReliableReplyProcessor21 { |
| private HashMap msgMap; |
| |
| public CommitReplyProcessor(DM dm, |
| Set initMembers, HashMap msgMap) { |
| super(dm, initMembers); |
| this.msgMap = msgMap; |
| } |
| |
| public void waitForCommitCompletion() { |
| try { |
| waitForRepliesUninterruptibly(); |
| } |
| catch (CommitExceptionCollectingException e) { |
| e.handlePotentialCommitFailure(msgMap); |
| } |
| } |
| |
| @Override |
| protected void processException(DistributionMessage msg, |
| ReplyException ex) { |
| if (msg instanceof ReplyMessage) { |
| synchronized(this) { |
| if (this.exception == null) { |
| // Exception Container |
| this.exception = new CommitExceptionCollectingException(txIdent); |
| } |
| CommitExceptionCollectingException cce = (CommitExceptionCollectingException) this.exception; |
| if (ex instanceof CommitReplyException) { |
| CommitReplyException cre = (CommitReplyException) ex; |
| cce.addExceptionsFromMember(msg.getSender(), cre.getExceptions()); |
| } else { |
| cce.addExceptionsFromMember(msg.getSender(), Collections.singleton(ex)); |
| } |
| } |
| } |
| } |
| |
| @Override |
| protected boolean stopBecauseOfExceptions() { |
| return false; |
| } |
| |
| public Set getCacheClosedMembers() { |
| if (this.exception != null) { |
| CommitExceptionCollectingException cce = (CommitExceptionCollectingException) this.exception; |
| return cce.getCacheClosedMembers(); |
| } else { |
| return Collections.EMPTY_SET; |
| } |
| } |
| public Set getRegionDestroyedMembers(String regionFullPath) { |
| if (this.exception != null) { |
| CommitExceptionCollectingException cce = (CommitExceptionCollectingException) this.exception; |
| return cce.getRegionDestroyedMembers(regionFullPath); |
| } else { |
| return Collections.EMPTY_SET; |
| } |
| } |
| } |
| |
| /** |
| * An Exception that collects many remote CommitExceptions |
| * @author Mitch Thomas |
| * @since 5.7 |
| */ |
| public static class CommitExceptionCollectingException extends ReplyException { |
| private static final long serialVersionUID = 589384721273797822L; |
| /** Set of members that threw CacheClosedExceptions */ |
| private final Set<InternalDistributedMember> cacheExceptions; |
| /** key=region path, value=Set of members */ |
| private final Map<String, Set<InternalDistributedMember>> regionExceptions; |
| /** List of exceptions that were unexpected and caused the tx to fail */ |
| private final Map fatalExceptions; |
| |
| private final TXId id; |
| public CommitExceptionCollectingException(TXId txIdent) { |
| this.cacheExceptions = new HashSet(); |
| this.regionExceptions = new HashMap(); |
| this.fatalExceptions = new HashMap(); |
| this.id = txIdent; |
| } |
| |
| /** |
| * Determine if the commit processing was incomplete, if so throw a detailed |
| * exception indicating the source of the problem |
| * @param msgMap |
| */ |
| public void handlePotentialCommitFailure(HashMap<InternalDistributedMember, RegionCommitList> msgMap) { |
| if (fatalExceptions.size() > 0) { |
| StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ") |
| .append(id).append(". Caused by the following exceptions: "); |
| for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) { |
| Map.Entry me = (Map.Entry) i.next(); |
| DistributedMember mem = (DistributedMember) me.getKey(); |
| errorMessage.append(" From member: ").append(mem).append(" "); |
| List exceptions = (List) me.getValue(); |
| for (Iterator ei = exceptions.iterator(); ei.hasNext(); ) |
| { |
| Exception e = (Exception) ei.next(); |
| errorMessage.append(e); |
| for (StackTraceElement ste : e.getStackTrace()) { |
| errorMessage.append("\n\tat ").append(ste); |
| } |
| if (ei.hasNext()) { |
| errorMessage.append("\nAND\n"); |
| } |
| } |
| errorMessage.append("."); |
| } |
| throw new CommitIncompleteException(errorMessage.toString()); |
| } |
| |
| //Mark any persistent members as offline |
| handleClosedMembers(msgMap); |
| handleRegionDestroyed(msgMap); |
| } |
| |
| /** |
| * Mark peers as offline for regions that the peer returned a RegionDestroyedException |
| */ |
| private void handleRegionDestroyed(HashMap<InternalDistributedMember, RegionCommitList> msgMap) { |
| if(regionExceptions == null || regionExceptions.isEmpty()) { |
| return; |
| } |
| |
| for(Map.Entry<InternalDistributedMember, RegionCommitList> memberMap: msgMap.entrySet()) { |
| InternalDistributedMember member = memberMap.getKey(); |
| RegionCommitList rcl = memberMap.getValue(); |
| for(RegionCommit region : rcl) { |
| Set<InternalDistributedMember> failedMembers = regionExceptions.get(region.r.getFullPath()); |
| if(failedMembers != null && failedMembers.contains(member)) { |
| markMemberOffline(member, region); |
| } |
| } |
| } |
| |
| } |
| |
| /** |
| * Mark peers as offline that returned a cache closed exception |
| */ |
| private void handleClosedMembers(HashMap<InternalDistributedMember, RegionCommitList> msgMap) { |
| for(InternalDistributedMember member: getCacheClosedMembers()) { |
| RegionCommitList rcl = msgMap.get(member); |
| |
| for(RegionCommit region : rcl) { |
| markMemberOffline(member, region); |
| } |
| } |
| } |
| |
| private void markMemberOffline(InternalDistributedMember member, |
| RegionCommit region) { |
| if(region.persistentIds == null) { |
| return; |
| } |
| |
| PersistentMemberID persistentId = region.persistentIds.get(member); |
| ///iterate over the list and mark the members offline |
| if(persistentId != null) { |
| //Fix for bug 42142 - In order for recovery to work, |
| //we must either |
| // 1) persistent the region operation successfully on the peer |
| // 2) record that the peer is offline |
| //or |
| // 3) fail the operation |
| |
| //if we have started to shutdown, we don't want to mark the peer |
| //as offline, or we will think we have newer data when in fact we don't |
| region.r.getCancelCriterion().checkCancelInProgress(null); |
| |
| //Otherwise, mark the peer as offline, because it didn't complete |
| //the operation. |
| ((DistributedRegion) region.r).getPersistenceAdvisor().markMemberOffline(member, persistentId); |
| } |
| } |
| |
| public Set<InternalDistributedMember> getCacheClosedMembers() { |
| return this.cacheExceptions; |
| } |
| public Set getRegionDestroyedMembers(String regionFullPath) { |
| Set members = (Set) this.regionExceptions.get(regionFullPath); |
| if (members == null) { |
| members = Collections.EMPTY_SET; |
| } |
| return members; |
| } |
| |
| /** |
| * Protected by (this) |
| * @param member |
| * @param exceptions |
| */ |
| public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) { |
| for (Iterator iter = exceptions.iterator(); iter.hasNext();) { |
| Exception ex = (Exception) iter.next(); |
| if (ex instanceof CancelException) { |
| cacheExceptions.add(member); |
| } |
| else if (ex instanceof RegionDestroyedException) { |
| String r = ((RegionDestroyedException)ex).getRegionFullPath(); |
| Set<InternalDistributedMember> members = regionExceptions.get(r); |
| if (members == null) { |
| members = new HashSet(); |
| regionExceptions.put(r, members); |
| } |
| members.add(member); |
| } else { |
| List el = (List) this.fatalExceptions.get(member); |
| if (el == null) { |
| el = new ArrayList(2); |
| this.fatalExceptions.put(member, el); |
| } |
| el.add(ex); |
| } |
| } |
| } |
| } |
| |
| public void hookupRegions(DM dm) { |
| if(regions!=null) { |
| Iterator it = regions.iterator(); |
| while(it.hasNext()) { |
| RegionCommit rc = (RegionCommit)it.next(); |
| rc.hookupRegion(dm); |
| } |
| } |
| |
| } |
| |
| |
| /** |
| * Disable firing of TX Listeners. Currently on used on clients. |
| * @param b disable the listeners |
| */ |
| public void setDisableListeners(boolean b) { |
| disableListeners = true; |
| } |
| |
| public Version getClientVersion() { |
| return clientVersion; |
| } |
| public void setClientVersion(Version clientVersion) { |
| this.clientVersion = clientVersion; |
| } |
| |
| } |