/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheRuntimeException;
import org.apache.geode.cache.CommitDistributionException;
import org.apache.geode.cache.CommitIncompleteException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionDistributionException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.TransactionListener;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReliableReplyProcessor21;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
import org.apache.geode.internal.cache.locks.TXLockId;
import org.apache.geode.internal.cache.locks.TXLockIdImpl;
import org.apache.geode.internal.cache.locks.TXLockService;
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * TXCommitMessage is the message that contains all the information that needs to be distributed, on
 * commit, to other cache members.
 *
 * @since GemFire 4.0
 */
public class TXCommitMessage extends PooledDistributionMessage
    implements MembershipListener, MessageWithReply {

  private static final Logger logger = LogService.getLogger();

  // Keep a 60 second history @ an estimated 1092 transactions/second ~= 16^4
  @MakeNotStatic
  protected static final TXFarSideCMTracker txTracker = new TXFarSideCMTracker((60 * 1092));

  private ArrayList<RegionCommit> regions; // list of RegionCommit instances
  protected TXId txIdent;
  protected int processorId; // 0 unless needsAck is true
  protected TXLockIdImpl lockId;
  protected HashSet farSiders;
  protected transient DistributionManager dm; // Used on the sending side of this message
  private transient int sequenceNum = 0;

  // Maps receiver Serializables to RegionCommitList instances
  private transient HashMap<InternalDistributedMember, RegionCommitList> msgMap = null;

  private transient RegionCommit currentRegion;
  protected transient TXState txState = null;
  private transient boolean wasProcessed;
  private transient boolean isProcessing;
  private transient boolean dontProcess;
  private transient boolean departureNoticed = false;
  private transient boolean lockNeedsUpdate = false;
  private transient boolean ackRequired = true;
  /**
   * List of operations to do when processing this tx. Valid on farside only.
   */
  protected transient ArrayList farSideEntryOps;
  private byte[] farsideBaseMembershipId; // only available on farside
  private long farsideBaseThreadId; // only available on farside
  private long farsideBaseSequenceId; // only available on farside

  /**
   * (Nearside) true of any regions in this TX have required roles
   */
  private transient boolean hasReliableRegions = false;

  /**
   * Set of all caching exceptions produced while processing this tx
   */
  private transient Set processingExceptions = Collections.emptySet();

  private ClientProxyMembershipID bridgeContext = null;

  /**
   * Version of the client that this TXCommitMessage is being sent to. Used for backwards
   * compatibility
   */
  private transient Version clientVersion;

  /**
   * A token to be put in TXManagerImpl#failoverMap to represent a CommitConflictException while
   * committing a transaction
   */
  @Immutable
  public static final TXCommitMessage CMT_CONFLICT_MSG = new TXCommitMessage();
  /**
   * A token to be put in TXManagerImpl#failoverMap to represent a
   * TransactionDataNodeHasDepartedException
   * while committing a transaction
   */
  @Immutable
  public static final TXCommitMessage REBALANCE_MSG = new TXCommitMessage();
  /**
   * A token to be put in TXManagerImpl#failoverMap to represent an exception while committing a
   * transaction
   */
  @Immutable
  public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage();
  /**
   * A token to be put in TXManagerImpl#failoverMap to represent a rolled back transaction
   */
  @Immutable
  public static final TXCommitMessage ROLLBACK_MSG = new TXCommitMessage();

  public TXCommitMessage(TXId txIdent, DistributionManager dm, TXState txState) {
    this.dm = dm;
    this.txIdent = txIdent;
    this.lockId = null;
    this.regions = null;
    this.txState = txState;
    this.wasProcessed = false;
    this.isProcessing = false;
    this.dontProcess = false;
    this.farSiders = null;
    this.bridgeContext = txState.bridgeContext;
  }

  public TXCommitMessage() {
    // zero arg constructor for DataSerializer
  }

  public static TXFarSideCMTracker getTracker() {
    return TXCommitMessage.txTracker;
  }

  /**
   * Create and return an eventId given its offset.
   *
   * @since GemFire 5.7
   */
  protected EventID getEventId(int eventOffset) {
    return new EventID(this.farsideBaseMembershipId, this.farsideBaseThreadId,
        this.farsideBaseSequenceId + eventOffset);
  }

  /**
   * Return the TXCommitMessage we have already received that is associated with id. Note because of
   * bug 37657 we may need to wait for it to show up.
   */
  public static TXCommitMessage waitForMessage(Object id, DistributionManager dm) {
    TXFarSideCMTracker map = getTracker();
    return map.waitForMessage(id, dm);
  }

  void startRegion(InternalRegion r, int maxSize) {
    this.currentRegion = new RegionCommit(this, r, maxSize);
    if (r.requiresReliabilityCheck()) {
      this.hasReliableRegions = true;
    }
  }

  void finishRegion(Set<InternalDistributedMember> s) {
    // make sure we have some changes and someone to send them to
    if (!this.currentRegion.isEmpty() && s != null && !s.isEmpty()) {
      // Get the persistent ids for the current region and save them
      this.currentRegion.persistentIds = getPersistentIds(this.currentRegion.internalRegion);

      if (this.msgMap == null) {
        this.msgMap = new HashMap<>();
      }
      {
        RegionCommitList newRCL = null;
        Iterator<InternalDistributedMember> it = s.iterator();
        while (it.hasNext()) {
          InternalDistributedMember recipient = it.next();

          if (!this.dm.getDistributionManagerIds().contains(recipient)) {
            if (logger.isDebugEnabled()) {
              logger.debug("Skipping member {} due to dist list absence", recipient);
            }
            // skip this member since the dm no longer knows about it
            continue;
          }
          RegionCommitList rcl = this.msgMap.get(recipient);
          if (rcl == null) {
            if (newRCL == null) {
              rcl = new RegionCommitList();
              rcl.add(this.currentRegion);
              newRCL = rcl;
            } else {
              rcl = newRCL;
            }
            this.msgMap.put(recipient, rcl);
          } else if (rcl.get(rcl.size() - 1) != this.currentRegion) {
            rcl.add(this.currentRegion);
          }
        }
      }

      // Now deal with each existing recipient that does not care
      // about this region
      Iterator<Map.Entry<InternalDistributedMember, RegionCommitList>> it =
          this.msgMap.entrySet().iterator();
      while (it.hasNext()) {
        Map.Entry<InternalDistributedMember, RegionCommitList> me = it.next();
        if (!s.contains(me.getKey())) {
          RegionCommitList rcl = me.getValue();
          RegionCommitList trimmedRcl = rcl.trim(this.currentRegion);
          if (trimmedRcl != rcl) {
            me.setValue(trimmedRcl);
          }
        }
      }
    }
    this.currentRegion = null;
  }

  private Map<InternalDistributedMember, PersistentMemberID> getPersistentIds(InternalRegion r) {
    if (r instanceof DistributedRegion) {
      return ((CacheDistributionAdvisee) r).getCacheDistributionAdvisor().advisePersistentMembers();
    } else {
      return Collections.emptyMap();
    }
  }

  void finishRegionComplete() {
    // make sure we have some changes and someone to send them to
    if (!this.currentRegion.isEmpty()) {
      {
        if (this.regions == null) {
          this.regions = new RegionCommitList();
        }
        this.regions.add(this.currentRegion);
      }
    }
    this.currentRegion = null;
  }

  Map viewVersions = new HashMap();

  private Boolean needsLargeModCount;

  private transient boolean disableListeners = false;

  /**
   * record CacheDistributionAdvisor.startOperation versions for later cleanup
   */
  protected void addViewVersion(DistributedRegion dr, long version) {
    viewVersions.put(dr, version);
  }

  protected void releaseViewVersions() {
    RuntimeException rte = null;
    for (Iterator it = viewVersions.entrySet().iterator(); it.hasNext();) {
      Map.Entry e = (Map.Entry) it.next();
      DistributedRegion dr = (DistributedRegion) e.getKey();
      Long viewVersion = (Long) e.getValue();
      // need to continue the iteration if one of the regions is destroyed
      // since others may still be okay
      try {
        dr.getDistributionAdvisor().endOperation(viewVersion);
      } catch (RuntimeException ex) {
        rte = ex;
      }
    }
    if (rte != null) {
      throw rte;
    }
  }

  private boolean isEmpty() {
    return this.msgMap == null || this.msgMap.isEmpty();
  }

  void addOp(InternalRegion r, Object key, TXEntryState entry, Set otherRecipients) {
    this.currentRegion.addOp(key, entry);
  }

  void send(TXLockId lockId) {
    if (isEmpty()) {
      if (logger.isDebugEnabled()) {
        logger.debug("empty transaction - nothing to distribute");
      }
      return;
    }
    Assert.assertTrue(this.txState != null, "Send must have transaction state.");
    this.lockId = (TXLockIdImpl) lockId;
    updateLockMembers();

    IdentityHashMap distMap = new IdentityHashMap(); // Map of RegionCommitList keys to Sets of
    // receivers
    HashSet ackReceivers = null;
    {
      Iterator it = this.msgMap.entrySet().iterator();
      while (it.hasNext()) {
        Map.Entry me = (Map.Entry) it.next();
        RegionCommitList rcl = (RegionCommitList) me.getValue();
        if (rcl.getNeedsAck()) {
          if (ackReceivers == null) {
            ackReceivers = new HashSet();
          }
          ackReceivers.add(me.getKey());
        }
        HashSet receivers = (HashSet) distMap.get(rcl);
        if (receivers == null) {
          receivers = new HashSet();
          distMap.put(rcl, receivers);
        }
        receivers.add(me.getKey());
      }
    }

    CommitReplyProcessor processor = null;
    {
      if (ackReceivers != null) {
        processor = new CommitReplyProcessor(this.dm, ackReceivers, msgMap);
        if (ackReceivers.size() > 1) {
          this.farSiders = ackReceivers;
        }
        processor.enableSevereAlertProcessing();
      }
      {
        Iterator it = distMap.entrySet().iterator();
        while (it.hasNext()) {
          Map.Entry me = (Map.Entry) it.next();
          RegionCommitList rcl = (RegionCommitList) me.getKey();
          HashSet recipients = (HashSet) me.getValue();
          // now remove from the recipients any recipients that the dm no
          // longer knows about
          recipients.retainAll(this.dm.getDistributionManagerIds());
          if (!recipients.isEmpty()) {
            if (this.txState.internalDuringIndividualSend != null) {
              // Run in test mode, splitting out individual recipients,
              // so we can control who gets what
              Iterator indivRecip = recipients.iterator();
              while (indivRecip.hasNext()) {
                this.txState.internalDuringIndividualSend.run();
                setRecipientsSendData(Collections.singleton(indivRecip.next()), processor, rcl);
              }
            } else {
              // Run in normal mode sending to multiple recipients in
              // one shot
              setRecipientsSendData(recipients, processor, rcl);
            }
          }
        }
      }
      if (this.txState.internalAfterIndividualSend != null) {
        this.txState.internalAfterIndividualSend.run();
      }
    }

    if (processor != null) {
      // Send the CommitProcessMessage
      final CommitProcessMessage cpMsg;
      if (this.lockId != null) {
        cpMsg = new CommitProcessForLockIdMessage(this.lockId);
      } else {
        cpMsg = new CommitProcessForTXIdMessage(this.txIdent);
      }
      if (this.txState.internalDuringIndividualCommitProcess != null) {
        // Run in test mode
        Iterator<InternalDistributedMember> indivRecip = ackReceivers.iterator();
        while (indivRecip.hasNext()) {
          this.txState.internalDuringIndividualCommitProcess.run();
          cpMsg.setRecipients(Collections.<InternalDistributedMember>singleton(indivRecip.next()));
          this.dm.putOutgoing(cpMsg);
          cpMsg.resetRecipients();
        }
      } else {
        // Run in normal mode
        cpMsg.setRecipients(ackReceivers);
        this.dm.putOutgoing(cpMsg);
      }

      if (this.txState.internalAfterIndividualCommitProcess != null) {
        // Testing callback
        this.txState.internalAfterIndividualCommitProcess.run();
      }

      // for() loop removed for bug 36983 - you can't loop on waitForReplies()
      dm.getCancelCriterion().checkCancelInProgress(null);
      processor.waitForCommitCompletion();
      this.dm.getStats().incCommitWaits();
    }
    if (this.hasReliableRegions) {
      checkDistributionReliability(distMap, processor);
    }
  }

  @Override
  public boolean containsRegionContentChange() {
    return true;
  }

  /**
   * Checks reliable regions and throws CommitDistributionException if any required roles may not
   * have received the commit message.
   *
   * @param distMap map of RegionCommitList keys to Sets of receivers
   * @param processor the reply processor
   * @throws CommitDistributionException if any required roles may not have received the commit
   *         message
   */
  private void checkDistributionReliability(Map distMap, CommitReplyProcessor processor) {
    // key=RegionCommit, value=Set of recipients
    Map regionToRecipients = new IdentityHashMap();

    // build up the keys in regionToRecipients and add all receivers
    for (Iterator distIter = distMap.entrySet().iterator(); distIter.hasNext();) {
      Map.Entry me = (Map.Entry) distIter.next();
      RegionCommitList rcl = (RegionCommitList) me.getKey();
      Set recipients = (Set) me.getValue();

      for (Iterator rclIter = rcl.iterator(); rclIter.hasNext();) {
        RegionCommit rc = (RegionCommit) rclIter.next();
        // skip region if no required roles
        if (!rc.internalRegion.requiresReliabilityCheck()) {
          continue;
        }

        Set recipientsForRegion = (Set) regionToRecipients.get(rc);
        if (recipientsForRegion == null) {
          recipientsForRegion = new HashSet();
          regionToRecipients.put(rc, recipientsForRegion);
        }

        // get the receiver Set for rcl and perform addAll
        if (recipients != null) {
          recipientsForRegion.addAll(recipients);
        }
      }
    }

    Set cacheClosedMembers =
        (processor == null) ? Collections.emptySet() : processor.getCacheClosedMembers();
    Set departedMembers =
        (processor == null) ? Collections.emptySet() : processor.getDepartedMembers();

    // check reliability on each region
    Set regionDistributionExceptions = Collections.emptySet();
    Set failedRegionNames = Collections.emptySet();
    for (Iterator iter = regionToRecipients.entrySet().iterator(); iter.hasNext();) {
      Map.Entry me = (Map.Entry) iter.next();
      final RegionCommit rc = (RegionCommit) me.getKey();

      final Set successfulRecipients = new HashSet(msgMap.keySet());
      successfulRecipients.removeAll(departedMembers);

      // remove members who destroyed that region or closed their cache
      Set regionDestroyedMembers = (processor == null) ? Collections.emptySet()
          : processor.getRegionDestroyedMembers(rc.internalRegion.getFullPath());

      successfulRecipients.removeAll(cacheClosedMembers);
      successfulRecipients.removeAll(regionDestroyedMembers);

      try {
        rc.internalRegion.handleReliableDistribution(successfulRecipients);
      } catch (RegionDistributionException e) {
        if (regionDistributionExceptions == Collections.emptySet()) {
          regionDistributionExceptions = new HashSet();
          failedRegionNames = new HashSet();
        }
        regionDistributionExceptions.add(e);
        failedRegionNames.add(rc.internalRegion.getFullPath());
      }
    }

    if (!regionDistributionExceptions.isEmpty()) {
      throw new CommitDistributionException(
          String.format(
              "These regions experienced reliability failure during distribution of the operation: %s",
              failedRegionNames),
          regionDistributionExceptions);
    }
  }

  /**
   * Helper method for send
   */
  private void setRecipientsSendData(Set recipients, ReplyProcessor21 processor,
      RegionCommitList rcl) {
    setRecipients(recipients);
    this.regions = rcl;
    if (rcl.getNeedsAck()) {
      this.processorId = processor.getProcessorId();
    } else {
      this.processorId = 0;
    }
    this.dm.getStats().incSentCommitMessages(1L);
    this.sequenceNum++;
    this.dm.putOutgoing(this);
    resetRecipients();
  }

  @Override
  protected void process(ClusterDistributionManager dm) {
    this.dm = dm;
    // Remove this node from the set of recipients
    if (this.farSiders != null) {
      this.farSiders.remove(dm.getId());
    }

    if (this.processorId != 0) {
      TXLockService.createDTLS(this.dm.getSystem()); // fix bug 38843; no-op if already created
      synchronized (this) {
        // Handle potential origin departure
        this.dm.addMembershipListener(this);
        // Assume ACK mode, defer processing until we receive a
        // CommitProcess message
        if (logger.isDebugEnabled()) {
          final Object key = getTrackerKey();
          logger.debug("Adding key:{} class{} to tracker list", key, key.getClass().getName());
        }
        txTracker.add(this);
      }
      if (!this.dm.getDistributionManagerIds().contains(getSender())) {
        memberDeparted(this.dm, getSender(), false /* don't care */);
      }

    } else {
      basicProcess();
    }
  }

  /**
   * Adds an entry op for this tx to do on the far side
   */
  void addFarSideEntryOp(RegionCommit.FarSideEntryOp entryOp) {
    this.farSideEntryOps.add(entryOp);
  }

  protected void addProcessingException(Exception e) {
    // clear all previous exceptions if e is a CacheClosedException
    if (this.processingExceptions == Collections.emptySet() || e instanceof CancelException) {
      this.processingExceptions = new HashSet();
    }
    this.processingExceptions.add(e);
  }

  public void setDM(DistributionManager dm) {
    this.dm = dm;
  }

  public void basicProcess() {
    final DistributionManager dm = this.dm;

    synchronized (this) {
      if (isProcessing()) {
        if (logger.isDebugEnabled()) {
          logger.debug("TXCommitMessage {} is already in process, returning", this);
        }
        return;
      } else {
        setIsProcessing(true);
      }
    }

    if (logger.isDebugEnabled()) {
      logger.debug("begin processing TXCommitMessage for {}", this.txIdent);
    }
    final InitializationLevel oldLevel =
        LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
    boolean forceListener = false; // this gets flipped if we need to fire tx listener
    // it needs to default to false because we don't want to fire listeners on pr replicates
    try {
      TXRmtEvent txEvent = null;
      final Cache cache = dm.getExistingCache();
      if (cache == null) {
        addProcessingException(new CacheClosedException());
        // return ... this cache is closed so we can't do anything.
        return;
      }
      final TransactionListener[] tls = cache.getCacheTransactionManager().getListeners();
      if (tls.length > 0) {
        txEvent = new TXRmtEvent(this.txIdent, cache);
      }
      try {
        // Pre-process each Region in the tx
        try {
          Iterator it = this.regions.iterator();
          while (it.hasNext()) {
            boolean failedBeginProcess = true;
            RegionCommit rc = (RegionCommit) it.next();
            try {
              failedBeginProcess = !rc.beginProcess(dm, this.txIdent, txEvent);
            } catch (CacheRuntimeException problem) {
              processCacheRuntimeException(problem);
            } finally {
              if (failedBeginProcess) {
                rc.internalRegion = null; // Cause related FarSideEntryOps to skip processing
                it.remove(); // Skip endProcessing as well
              }
            }
          }
          basicProcessOps();
        } finally { // fix for bug 40001
          // post-process each Region in the tx
          Iterator it = this.regions.iterator();
          while (it.hasNext()) {
            try {
              RegionCommit rc = (RegionCommit) it.next();
              rc.endProcess();
              if (rc.isForceFireEvent(dm)) {
                forceListener = true;
              }
            } catch (CacheRuntimeException problem) {
              processCacheRuntimeException(problem);
            }
          }
        }

        /*
         * We need to make sure that we should fire a TX afterCommit event.
         */
        boolean internalEvent = (txEvent != null && txEvent.hasOnlyInternalEvents());
        if (!disableListeners && !internalEvent
            && (forceListener || (txEvent != null && !txEvent.isEmpty()))) {
          for (int i = 0; i < tls.length; i++) {
            try {
              tls[i].afterCommit(txEvent);
            } catch (VirtualMachineError err) {
              SystemFailure.initiateFailure(err);
              // If this ever returns, rethrow the error. We're poisoned
              // now, so don't let this thread continue.
              throw err;
            } catch (Throwable t) {
              // Whenever you catch Error or Throwable, you must also
              // catch VirtualMachineError (see above). However, there is
              // _still_ a possibility that you are dealing with a cascading
              // error condition, so you also need to check to see if the JVM
              // is still usable:
              SystemFailure.checkFailure();
              logger.error("Exception occurred in TransactionListener",
                  t);
            }
          }
        }
      } catch (CancelException e) {
        processCacheRuntimeException(e);
      } finally {
        if (txEvent != null) {
          txEvent.freeOffHeapResources();
        }
      }
    } finally {
      LocalRegion.setThreadInitLevelRequirement(oldLevel);
      if (isAckRequired()) {
        ack();
      }
      if (!dm.getExistingCache().isClient() && bridgeContext != null) {
        getTracker().saveTXForClientFailover(txIdent, this);
      }
      if (logger.isDebugEnabled()) {
        logger.debug("completed processing TXCommitMessage for {}", this.txIdent);
      }
    }
  }

  public void basicProcessOps() {
    List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size());
    Collections.sort(this.farSideEntryOps);
    Iterator it = this.farSideEntryOps.iterator();
    while (it.hasNext()) {
      try {
        RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp) it.next();
        entryOp.process(pendingCallbacks);
      } catch (CacheRuntimeException problem) {
        processCacheRuntimeException(problem);
      } catch (Exception e) {
        addProcessingException(e);
      }
    }
    firePendingCallbacks(pendingCallbacks);
  }

  private void firePendingCallbacks(List<EntryEventImpl> callbacks) {
    Iterator<EntryEventImpl> ci = callbacks.iterator();
    while (ci.hasNext()) {
      EntryEventImpl ee = ci.next();
      try {
        if (ee.getOperation().isDestroy()) {
          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
        } else if (ee.getOperation().isInvalidate()) {
          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
        } else if (ee.getOperation().isCreate()) {
          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
        } else {
          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
        }
      } finally {
        ee.release();
      }
    }
  }

  protected void processCacheRuntimeException(CacheRuntimeException problem) {
    if (problem instanceof RegionDestroyedException) { // catch RegionDestroyedException
      addProcessingException(problem);
    } else if (problem instanceof CancelException) { // catch CacheClosedException
      addProcessingException(problem);
      throw problem;
    } else { // catch CacheRuntimeException
      addProcessingException(problem);
      logger.error(
          "Transaction message {} from sender {} failed processing, unknown transaction state: {}",
          new Object[] {this, getSender(), problem});
    }
  }

  private void ack() {
    if (this.processorId != 0) {
      CommitReplyException replyEx = null;
      if (!this.processingExceptions.isEmpty()) {
        replyEx = new CommitReplyException(
            String.format("Commit operation generated one or more exceptions from %s",
                this.getSender()),
            this.processingExceptions);
      }
      ReplyMessage.send(getSender(), this.processorId, replyEx, this.dm);
    }
  }

  @Override
  public int getDSFID() {
    // on near side send old TX_COMMIT_MESSAGE if there is at least one 7.0
    // member in the system, otherwise send the new 7.0.1 message.
    // 7.0.1 members will be able to deserialize either
    // if (shouldSend701Message()) {
    // this.shouldWriteShadowKey = true;
    // return TX_COMMIT_MESSAGE_701;
    return TX_COMMIT_MESSAGE;
    /*
     * } this.shouldWriteShadowKey = false; return TX_COMMIT_MESSAGE;
     */
  }

  /*
   * /** Do not send shadowKey to clients or when there are member(s) older than 7.0.1.
   *
   * private boolean shouldSend701Message() { if (this.clientVersion == null &&
   * this.getDM().getMembersWithOlderVersion("7.0.1").isEmpty()) { return true; } return false; }
   *
   * public boolean shouldReadShadowKey() { return this.shouldReadShadowKey; }
   *
   * public void setShouldReadShadowKey(boolean shouldReadShadowKey) { this.shouldReadShadowKey =
   * shouldReadShadowKey; }
   *
   * public boolean shouldWriteShadowKey() { return this.shouldWriteShadowKey; }
   */

  @Override
  public void fromData(DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    int pId = in.readInt();

    if (isAckRequired()) {
      this.processorId = pId;
      ReplyProcessor21.setMessageRPId(this.processorId);
    } else {
      this.processorId = -1;
    }

    this.txIdent = TXId.createFromData(in);
    if (in.readBoolean()) {
      this.lockId = TXLockIdImpl.createFromData(in);
    }
    int totalMaxSize = in.readInt();

    this.farsideBaseMembershipId = DataSerializer.readByteArray(in);
    this.farsideBaseThreadId = in.readLong();
    this.farsideBaseSequenceId = in.readLong();

    this.needsLargeModCount = in.readBoolean();

    final boolean hasShadowKeys = hasFlagsField(in) ? in.readBoolean() : useShadowKey();

    int regionsSize = in.readInt();
    this.regions = new ArrayList(regionsSize);
    this.farSideEntryOps = new ArrayList(totalMaxSize);
    for (int i = 0; i < regionsSize; i++) {
      RegionCommit rc = new RegionCommit(this);
      try {
        rc.fromData(in, hasShadowKeys);
      } catch (CacheClosedException cce) {
        addProcessingException(cce);
        // return to avoid serialization error being sent in reply
        return;
      }
      this.regions.add(rc);
    }

    this.bridgeContext = ClientProxyMembershipID.readCanonicalized(in);
    this.farSiders = DataSerializer.readHashSet(in);
  }

  /**
   * Return true if a distributed ack message is required. On the client side of a transaction, this
   * returns false, while returning true elsewhere.
   *
   * @return requires ack message or not
   */
  private boolean isAckRequired() {
    return this.ackRequired;
  }


  /**
   * Indicate whether an ack is required. Defaults to true.
   *
   * @param a true if we require an ack. false if not. false on clients.
   */
  public void setAckRequired(boolean a) {
    this.ackRequired = a;
    if (!a) {
      this.processorId = -1;
    }
  }

  @Override
  public void toData(DataOutput out,
      SerializationContext context) throws IOException {
    out.writeInt(this.processorId);
    InternalDataSerializer.invokeToData(this.txIdent, out);
    {
      boolean hasLockId = this.lockId != null;
      out.writeBoolean(hasLockId);
      if (hasLockId) {
        InternalDataSerializer.invokeToData(this.lockId, out);
      }
    }
    int regionsSize = 0;
    {
      int totalMaxSize = 0;
      if (this.regions != null) {
        regionsSize = this.regions.size();
        for (int i = 0; i < this.regions.size(); i++) {
          RegionCommit rc = (RegionCommit) this.regions.get(i);
          totalMaxSize += rc.maxSize;
        }
      }
      out.writeInt(totalMaxSize);
    }

    if (this.txState != null) {
      DataSerializer.writeByteArray(this.txState.getBaseMembershipId(), out);
      out.writeLong(this.txState.getBaseThreadId());
      out.writeLong(this.txState.getBaseSequenceId());
    } else {
      DataSerializer.writeByteArray(this.farsideBaseMembershipId, out);
      out.writeLong(this.farsideBaseThreadId);
      out.writeLong(this.farsideBaseSequenceId);
    }

    if (this.txState != null) {
      DataSerializer.writeBoolean(this.txState.needsLargeModCount(), out);
    } else {
      DataSerializer.writeBoolean(this.needsLargeModCount, out);
    }

    final boolean useShadowKey = useShadowKey();
    if (hasFlagsField(out)) {
      out.writeBoolean(useShadowKey);
    }

    out.writeInt(regionsSize);
    {
      if (regionsSize > 0) {
        for (int i = 0; i < this.regions.size(); i++) {
          RegionCommit rc = (RegionCommit) this.regions.get(i);
          rc.toData(out, context, useShadowKey);
        }
      }
    }

    DataSerializer.writeObject(bridgeContext, out);

    DataSerializer.writeHashSet(this.farSiders, out);
  }

  private boolean hasFlagsField(final DataOutput out) {
    return hasFlagsField(InternalDataSerializer.getVersionForDataStream(out));
  }

  private boolean hasFlagsField(final DataInput in) {
    return hasFlagsField(InternalDataSerializer.getVersionForDataStream(in));
  }

  private boolean hasFlagsField(final Version version) {
    return version.compareTo(Version.GEODE_1_7_0) >= 0;
  }

  private boolean useShadowKey() {
    return null == clientVersion;
  }

  @Override
  public String toString() {
    StringBuilder result = new StringBuilder(256);
    result.append("TXCommitMessage@").append(System.identityHashCode(this)).append("#")
        .append(this.sequenceNum).append(" processorId=").append(this.processorId).append(" txId=")
        .append(this.txIdent);

    if (this.farSiders != null) {
      Iterator fs = this.farSiders.iterator();
      result.append(" farSiders=");
      while (fs.hasNext()) {
        result.append(fs.next());
        if (fs.hasNext()) {
          result.append(' ');
        }
      }
    } else {
      result.append(" farSiders=<null>");
    }
    if (this.regions != null) {
      Iterator it = this.regions.iterator();
      while (it.hasNext()) {
        result.append(' ').append(it.next());
      }
    }
    return result.toString();
  }

  /**
   * Combines a set of small TXCommitMessages that belong to one transaction into a txCommitMessage
   * that represents an entire transaction. At commit time the txCommitMessage sent to each node can
   * be a subset of the transaction, this method will combine those subsets into a complete
   * message.
   *
   * @return the complete txCommitMessage
   */
  public static TXCommitMessage combine(Set<TXCommitMessage> msgSet) {
    assert msgSet != null;
    TXCommitMessage firstPart = null;
    Iterator<TXCommitMessage> it = msgSet.iterator();
    while (it.hasNext()) {
      if (firstPart == null) {
        firstPart = it.next();
        continue;
      }
      firstPart.combine(it.next());
    }
    return firstPart;
  }

  /**
   * Combines the other TXCommitMessage into this message. Used to compute complete TXCommitMessage
   * from parts.
   */
  public void combine(TXCommitMessage other) {
    assert other != null;
    Iterator it = other.regions.iterator();
    Map<String, RegionCommit> regionCommits = new HashMap<>();
    for (RegionCommit commit : regions) {
      regionCommits.put(commit.getRegionPath(), commit);
    }
    for (RegionCommit commit : other.regions) {
      if (!regionCommits.containsKey(commit.getRegionPath())) {
        commit.msg = this;
        this.regions.add(commit);
        regionCommits.put(commit.getRegionPath(), commit);
      }
    }
  }

  public static class RegionCommitList extends ArrayList<RegionCommit> {
    private static final long serialVersionUID = -8910813949027683641L;
    private transient boolean needsAck = false;
    private transient RegionCommit trimRC = null;
    private transient RegionCommitList trimChild = null;

    public RegionCommitList() {
      super();
    }

    public RegionCommitList(RegionCommitList c) {
      super(c);
    }

    public boolean getNeedsAck() {
      return this.needsAck;
    }

    @Override // GemStoneAddition
    public boolean add(RegionCommit o) {
      RegionCommit rc = (RegionCommit) o;
      rc.incRefCount();
      if (!this.needsAck && rc.needsAck()) {
        this.needsAck = true;
      }
      return super.add(o);
    }

    /**
     * Creates a new list, if needed, that contains all the elements of the specified old list
     * except the last one if it is 'rc'. Also recomputes needsAck field.
     */
    public RegionCommitList trim(RegionCommit rc) {
      if (get(size() - 1) != rc) {
        // no need to trim because it does not contain rc
        return this;
      }
      if (this.trimRC == rc) {
        return this.trimChild;
      }
      RegionCommitList result = new RegionCommitList(this);
      this.trimRC = rc;
      this.trimChild = result;
      result.remove(result.size() - 1);
      {
        Iterator it = result.iterator();
        while (it.hasNext()) {
          RegionCommit itrc = (RegionCommit) it.next();
          itrc.incRefCount();
          if (itrc.needsAck()) {
            result.needsAck = true;
          }
        }
      }
      return result;
    }

    @Override
    public String toString() {
      StringBuilder result = new StringBuilder(256);
      result.append('@').append(System.identityHashCode(this)).append(' ').append(super.toString());
      return result.toString();
    }
  }

  public static class RegionCommit {
    private final TxCallbackEventFactory txCallbackEventFactory = new TxCallbackEventFactoryImpl();
    /**
     * The region that this commit represents. Valid on both nearside and farside.
     */
    protected transient InternalRegion internalRegion;
    /**
     * Valid only on farside.
     */
    private String regionPath;
    private String parentRegionPath;
    /**
     * The message this region commit is a part of. Valid on both farside and nearside.
     */
    private transient TXCommitMessage msg;
    /**
     * Number of RegionCommitList instances that have this RegionCommit in them Valid only on
     * nearside.
     */
    private transient int refCount = 0;
    /**
     * Valid only on nearside.
     */
    private transient HeapDataOutputStream preserializedBuffer = null;
    /**
     * Upperbound on the number of operations this region could possibly have Valid only on
     * nearside.
     */
    transient int maxSize;
    /**
     * A list of Object; each one is the entry key for a distributed operation done by this
     * transaction. The list must be kept in sync with opKeys. Valid only on nearside.
     */
    private transient ArrayList opKeys;
    /**
     * A list of TXEntryState; each one is the entry info for a distributed operation done by this
     * transaction. The list must be kept in sync with opKeys. Valid only on nearside.
     */
    private transient ArrayList opEntries;

    private transient VersionSource memberId;

    /**
     * The persistent ids of the peers for this region. Used to mark peers as offline if they do not
     * apply the commit due to a cache close.
     */
    public Map<InternalDistributedMember, PersistentMemberID> persistentIds;

    /**
     * Used on nearside
     */
    RegionCommit(TXCommitMessage msg, InternalRegion r, int maxSize) {
      this.msg = msg;
      this.internalRegion = r;
      this.maxSize = maxSize;
    }

    /**
     * Used on farside who inits r later and never sets maxSize
     */
    RegionCommit(TXCommitMessage msg) {
      this.msg = msg;
    }

    public String getRegionPath() {
      return regionPath;
    }

    public void incRefCount() {
      this.refCount++;
    }

    /**
     * Valid on farside after beginProcess. Used to remember what to do at region cleanup time
     */
    private boolean needsUnlock;
    /**
     * Valid on farside after beginProcess. Used to remember what to do at region cleanup time
     */
    private boolean needsLRUEnd;
    /**
     * Valid on farside after beginProcess This is the txEvent that should be used by this
     * RegionCommit
     */
    private TXRmtEvent txEvent;

    /**
     * Called to setup a region commit so its entryOps can be processed
     *
     * @return true if region should be processed; false if it can be ignored
     * @throws CacheClosedException if the cache has been closed
     */
    boolean beginProcess(DistributionManager dm, TransactionId txIdent, TXRmtEvent txEvent)
        throws CacheClosedException {
      if (logger.isDebugEnabled()) {
        logger.debug("begin processing TXCommitMessage {} for region {}", txIdent, this.regionPath);
      }
      try {
        if (!hookupRegion(dm)) {
          return false;
        }
        if (msg.isAckRequired()
            && (this.internalRegion == null || !this.internalRegion.getScope().isDistributed())) {
          if (logger.isDebugEnabled()) {
            logger.debug("Received unneeded commit data for region {}", this.regionPath);
          }
          this.msg.addProcessingException(new RegionDestroyedException(
              "Region not found",
              this.regionPath));
          this.internalRegion = null;
          return false;
        }
        this.needsUnlock = this.internalRegion.lockGII();
        this.internalRegion.txLRUStart();
        this.needsLRUEnd = true;
        if (this.internalRegion.isInitialized()) {
          // We don't want the txEvent to know anything about our regions
          // that are still doing gii.
          this.txEvent = txEvent;
        }
      } catch (RegionDestroyedException e) {
        this.msg.addProcessingException(e);
        // Region destroyed: Update cancelled
        if (logger.isDebugEnabled()) {
          logger.debug(
              "Received unneeded commit data for region {} because the region was destroyed.",
              this.regionPath, e);
        }
        this.internalRegion = null;
      }
      return this.internalRegion != null;
    }

    private boolean hookupRegion(DistributionManager dm) {
      this.internalRegion = getRegionByPath(dm, regionPath);
      if (this.internalRegion == null && this.parentRegionPath != null) {
        this.internalRegion = getRegionByPath(dm, this.parentRegionPath);
        this.regionPath = this.parentRegionPath;
      }
      if (this.internalRegion == null && dm.getSystem().isLoner()) {
        // If there are additional regions that the server enlisted in the tx,
        // which the client does not have, the client can just ignore the region
        // see bug 51922
        return false;
      }
      return true;
    }

    LocalRegion getRegionByPath(DistributionManager dm, String regionPath) {
      InternalCache cache = dm.getCache();
      return cache == null ? null : (LocalRegion) cache.getRegionByPath(regionPath);
    }

    /**
     * Called when processing is complete; only needs to be called if beginProcess returned true.
     */
    void endProcess() {
      if (this.internalRegion != null) {
        try {
          if (this.needsLRUEnd) {
            this.needsLRUEnd = false;
            this.internalRegion.txLRUEnd();
          }
        } finally {
          if (this.needsUnlock) {
            this.needsUnlock = false;
            this.internalRegion.unlockGII();
          }
        }
      }
    }

    /**
     * Returns the eventId to use for the give farside entry op.
     *
     * @since GemFire 5.7
     */
    private EventID getEventId(FarSideEntryOp entryOp) {
      return this.msg.getEventId(entryOp.eventOffset);
    }


    /**
     * Apply a single tx entry op on the far side
     */
    @SuppressWarnings("synthetic-access")
    protected void txApplyEntryOp(FarSideEntryOp entryOp, List<EntryEventImpl> pendingCallbacks) {
      if (this.internalRegion == null) {
        return;
      }
      EventID eventID = getEventId(entryOp);
      boolean isDuplicate = this.internalRegion.hasSeenEvent(eventID);
      boolean callbacksOnly =
          (this.internalRegion.getDataPolicy() == DataPolicy.PARTITION) || isDuplicate;
      if (this.internalRegion instanceof PartitionedRegion) {
        /*
         * This happens when we don't have the bucket and are getting adjunct notification
         */
        // No need to release because it is added to pendingCallbacks and they will be released
        // later
        EntryEventImpl eei =
            txCallbackEventFactory.createCallbackEvent(this.internalRegion, entryOp.op,
                entryOp.key,
                entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,
                entryOp.filterRoutingInfo, this.msg.bridgeContext, null, entryOp.versionTag,
                entryOp.tailKey);
        if (entryOp.filterRoutingInfo != null) {
          eei.setLocalFilterInfo(
              entryOp.filterRoutingInfo.getFilterInfo(this.internalRegion.getCache().getMyId()));
        }
        if (isDuplicate) {
          eei.setPossibleDuplicate(true);
        }
        if (logger.isDebugEnabled()) {
          logger.debug("invoking transactional callbacks for {} key={} needsUnlock={} event={}",
              entryOp.op, entryOp.key, this.needsUnlock, eei);
        }
        // we reach this spot because the event is either delivered to this member
        // as an "adjunct" message or because the bucket was being created when
        // the message was sent and already reflects the change caused by this event.
        // In the latter case we need to invoke listeners
        final boolean skipListeners = !isDuplicate;
        eei.setInvokePRCallbacks(!skipListeners);
        pendingCallbacks.add(eei);
        return;
      }
      if (logger.isDebugEnabled()) {
        logger.debug("applying transactional {} key={} needsUnlock={} eventId {} with routing {}",
            entryOp.op, entryOp.key, this.needsUnlock, getEventId(entryOp),
            entryOp.filterRoutingInfo);
      }
      if (entryOp.versionTag != null) {
        entryOp.versionTag.replaceNullIDs(this.msg.getSender());
      }
      if (entryOp.op.isDestroy()) {
        this.internalRegion.txApplyDestroy(entryOp.key, this.msg.txIdent, this.txEvent,
            this.needsUnlock,
            entryOp.op, getEventId(entryOp), entryOp.callbackArg, pendingCallbacks,
            entryOp.filterRoutingInfo, this.msg.bridgeContext, false /* origin remote */,
            null/* txEntryState */, entryOp.versionTag, entryOp.tailKey);
      } else if (entryOp.op.isInvalidate()) {
        this.internalRegion.txApplyInvalidate(entryOp.key, Token.INVALID, entryOp.didDestroy,
            this.msg.txIdent,
            this.txEvent, false /* localOp */, getEventId(entryOp), entryOp.callbackArg,
            pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext,
            null/* txEntryState */, entryOp.versionTag, entryOp.tailKey);
      } else {
        this.internalRegion.txApplyPut(entryOp.op, entryOp.key, entryOp.value, entryOp.didDestroy,
            this.msg.txIdent, this.txEvent, getEventId(entryOp), entryOp.callbackArg,
            pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext,
            null/* txEntryState */, entryOp.versionTag, entryOp.tailKey);
      }
    }

    /**
     * Apply a single tx entry op on the far side
     */
    @SuppressWarnings("synthetic-access")
    protected void txApplyEntryOpAdjunctOnly(FarSideEntryOp entryOp) {
      if (this.internalRegion == null) {
        return;
      }
      EventID eventID = getEventId(entryOp);
      boolean isDuplicate = this.internalRegion.hasSeenEvent(eventID);
      boolean callbacksOnly =
          (this.internalRegion.getDataPolicy() == DataPolicy.PARTITION) || isDuplicate;
      if (this.internalRegion instanceof PartitionedRegion) {

        PartitionedRegion pr = (PartitionedRegion) internalRegion;
        BucketRegion br = pr.getBucketRegion(entryOp.key);
        Set bucketOwners = br.getBucketOwners();
        InternalDistributedMember thisMember = this.internalRegion.getDistributionManager().getId();
        if (bucketOwners.contains(thisMember)) {
          return;
        }

        /*
         * This happens when we don't have the bucket and are getting adjunct notification
         */
        @Released
        EntryEventImpl eei =
            txCallbackEventFactory.createCallbackEvent(this.internalRegion, entryOp.op,
                entryOp.key,
                entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,
                entryOp.filterRoutingInfo, this.msg.bridgeContext, null, entryOp.versionTag,
                entryOp.tailKey);
        try {
          if (entryOp.filterRoutingInfo != null) {
            eei.setLocalFilterInfo(
                entryOp.filterRoutingInfo.getFilterInfo(this.internalRegion.getCache().getMyId()));
          }
          if (isDuplicate) {
            eei.setPossibleDuplicate(true);
          }
          if (logger.isDebugEnabled()) {
            logger.debug("invoking transactional callbacks for {} key={} needsUnlock={} event={}",
                entryOp.op, entryOp.key, this.needsUnlock, eei);
          }
          // we reach this spot because the event is either delivered to this member
          // as an "adjunct" message or because the bucket was being created when
          // the message was sent and already reflects the change caused by this event.
          // In the latter case we need to invoke listeners
          final boolean skipListeners = !isDuplicate;
          eei.invokeCallbacks(this.internalRegion, skipListeners, true);
        } finally {
          eei.release();
        }
        return;
      }
    }

    boolean isEmpty() {
      return this.opKeys == null;
    }

    boolean needsAck() {
      return this.internalRegion.getScope().isDistributedAck();
    }

    void addOp(Object key, TXEntryState entry) {
      if (this.opKeys == null) {
        this.opKeys = new ArrayList(this.maxSize);
        this.opEntries = new ArrayList(this.maxSize);
      }
      this.opKeys.add(key);
      this.opEntries.add(entry);
    }


    public boolean isForceFireEvent(DistributionManager dm) {
      LocalRegion r = getRegionByPath(dm, regionPath);
      if (r instanceof PartitionedRegion || (r != null && r.isUsedForPartitionedRegionBucket())) {
        return false;
      }
      return true;
    }

    public void fromData(DataInput in, boolean hasShadowKey)
        throws IOException, ClassNotFoundException {
      this.regionPath = DataSerializer.readString(in);
      this.parentRegionPath = DataSerializer.readString(in);

      int size = in.readInt();
      if (size > 0) {
        this.opKeys = new ArrayList(size);
        this.opEntries = new ArrayList(size);
        final boolean largeModCount = in.readBoolean();
        this.memberId = DataSerializer.readObject(in);
        for (int i = 0; i < size; i++) {
          FarSideEntryOp entryOp = new FarSideEntryOp();
          // shadowkey is not being sent to clients
          entryOp.fromData(in, largeModCount, hasShadowKey);
          if (entryOp.versionTag != null && this.memberId != null) {
            entryOp.versionTag.setMemberID(this.memberId);
          }
          this.msg.addFarSideEntryOp(entryOp);
          this.opKeys.add(entryOp.key);
          this.opEntries.add(entryOp);
        }
      }
    }

    @Override
    public String toString() {
      StringBuilder result = new StringBuilder(64);
      if (this.regionPath != null) {
        result.append(this.regionPath);
      } else {
        result.append(this.internalRegion.getFullPath());
      }
      if (this.refCount > 0) {
        result.append(" refCount=").append(this.refCount);
      }
      return result.toString();
    }

    private void basicToData(DataOutput out,
        SerializationContext context,
        boolean useShadowKey) throws IOException {
      if (this.internalRegion != null) {
        DataSerializer.writeString(this.internalRegion.getFullPath(), out);
        if (this.internalRegion instanceof BucketRegion) {
          DataSerializer.writeString(
              ((Bucket) this.internalRegion).getPartitionedRegion().getFullPath(), out);
        } else {
          DataSerializer.writeString(null, out);
        }
      } else {
        DataSerializer.writeString(this.regionPath, out);
        DataSerializer.writeString(this.parentRegionPath, out);
      }

      if (isEmpty() || this.opKeys.size() == 0) {
        out.writeInt(0);
      } else {
        int size = this.opKeys.size();
        out.writeInt(size);

        final boolean largeModCount;
        if (this.msg.txState != null) {
          largeModCount = this.msg.txState.needsLargeModCount();
        } else {
          largeModCount = this.msg.needsLargeModCount;
        }
        out.writeBoolean(largeModCount);

        final boolean sendVersionTags =
            this.msg.clientVersion == null || Version.GFE_70.compareTo(this.msg.clientVersion) <= 0;
        if (sendVersionTags) {
          VersionSource member = this.memberId;
          if (member == null) {
            if (this.internalRegion == null) {
              Assert.assertTrue(this.msg.txState == null);
            } else {
              member = this.internalRegion.getVersionMember();
            }
          }
          DataSerializer.writeObject(member, out);
        }
        for (int i = 0; i < size; i++) {
          DataSerializer.writeObject(this.opKeys.get(i), out);
          if (this.msg.txState != null) {
            /* we are still on tx node and have the entry state */
            ((TXEntryState) this.opEntries.get(i)).toFarSideData(out, context, largeModCount,
                sendVersionTags, useShadowKey);
          } else {
            ((FarSideEntryOp) this.opEntries.get(i)).toData(out, largeModCount, sendVersionTags,
                useShadowKey);
          }
        }
      }
    }


    public void toData(DataOutput out, SerializationContext context, boolean useShadowKey)
        throws IOException {
      if (this.preserializedBuffer != null) {
        this.preserializedBuffer.rewind();
        this.preserializedBuffer.sendTo(out);
      } else if (this.refCount > 1) {
        Version v = InternalDataSerializer.getVersionForDataStream(out);
        HeapDataOutputStream hdos = new HeapDataOutputStream(1024, v);
        basicToData(hdos, context, useShadowKey);
        this.preserializedBuffer = hdos;
        this.preserializedBuffer.sendTo(out);
      } else {
        basicToData(out, context, useShadowKey);
      }
    }

    /**
     * Holds data that describes a tx entry op on the far side.
     *
     * @since GemFire 5.0
     */
    public class FarSideEntryOp implements Comparable {
      public Operation op;
      public int modSerialNum;
      public int eventOffset;
      public Object key;
      public Object value;
      public boolean didDestroy;
      public Object callbackArg;
      private FilterRoutingInfo filterRoutingInfo;
      private VersionTag versionTag;
      private long tailKey;

      /**
       * Create a new representation of a tx entry op on the far side. All init will be done by a
       * call to fromData
       */
      public FarSideEntryOp() {}

      /**
       * Creates and returns a new instance of a tx entry op on the far side. The "toData" that this
       * should match is {@link TXEntryState#toFarSideData}.
       *
       * @param in the data input that is used to read the data for this entry op
       * @param largeModCount true if the mod count is a int instead of a byte.
       * @param readShadowKey true if a long shadowKey should be read
       */
      public void fromData(DataInput in, boolean largeModCount, boolean readShadowKey)
          throws IOException, ClassNotFoundException {
        this.key = DataSerializer.readObject(in);
        this.op = Operation.fromOrdinal(in.readByte());
        if (largeModCount) {
          this.modSerialNum = in.readInt();
        } else {
          this.modSerialNum = in.readByte();
        }
        this.callbackArg = DataSerializer.readObject(in);
        this.filterRoutingInfo = DataSerializer.readObject(in);
        this.versionTag = DataSerializer.readObject(in);
        if (readShadowKey) {
          this.tailKey = in.readLong();
        }
        this.eventOffset = in.readInt();
        if (!this.op.isDestroy()) {
          this.didDestroy = in.readBoolean();
          if (!this.op.isInvalidate()) {
            boolean isTokenOrByteArray = in.readBoolean();
            if (isTokenOrByteArray) {
              // token or byte[]
              this.value = DataSerializer.readObject(in);
            } else {
              // CachedDeserializable, Object, or PDX
              this.value = CachedDeserializableFactory.create(DataSerializer.readByteArray(in),
                  GemFireCacheImpl.getInstance());
            }
          }
        }
      }

      public void toData(DataOutput out, boolean largeModCount, boolean sendVersionTag,
          boolean sendShadowKey) throws IOException {
        // DataSerializer.writeObject(this.key,out);
        /* Don't serialize key because caller did that already */

        out.writeByte(this.op.ordinal);
        if (largeModCount) {
          out.writeInt(this.modSerialNum);
        } else {
          out.writeByte(this.modSerialNum);
        }
        DataSerializer.writeObject(this.callbackArg, out);
        DataSerializer.writeObject(this.filterRoutingInfo, out);
        if (sendVersionTag) {
          DataSerializer.writeObject(this.versionTag, out);
        }
        if (sendShadowKey) {
          out.writeLong(this.tailKey);
        }
        out.writeInt(this.eventOffset);
        if (!this.op.isDestroy()) {
          out.writeBoolean(this.didDestroy);
          if (!this.op.isInvalidate()) {
            boolean sendObject = Token.isInvalidOrRemoved(this.value);
            sendObject = sendObject || this.value instanceof byte[];
            out.writeBoolean(sendObject);
            if (sendObject) {
              DataSerializer.writeObject(this.value, out);
            } else {
              DataSerializer.writeObjectAsByteArray(this.value, out);
            }
          }
        }
      }


      /**
       * Performs this entryOp on the farside of a tx commit.
       */
      public void process(List<EntryEventImpl> pendingCallbacks) {
        txApplyEntryOp(this, pendingCallbacks);
      }

      public void processAdjunctOnly() {
        txApplyEntryOpAdjunctOnly(this);
      }

      public RegionCommit getRegionCommit() {
        return RegionCommit.this;
      }

      /**
       * Returns the value to use to sort us
       */
      private int getSortValue() {
        return this.modSerialNum;
      }

      @Override
      public int compareTo(Object o) {
        FarSideEntryOp other = (FarSideEntryOp) o;
        return getSortValue() - other.getSortValue();
      }

      @Override
      public boolean equals(Object o) {
        if (o == null || !(o instanceof FarSideEntryOp)) {
          return false;
        }
        return compareTo(o) == 0;
      }

      @Override
      public int hashCode() {
        return getSortValue();
      }
    }
  }

  Object getTrackerKey() {
    if (this.lockId != null) {
      return this.lockId;
    } else {
      return this.txIdent;
    }
  }

  /**
   * Used to prevent processing of the message if we have reported to other FarSiders that we did
   * not received the CommitProcessMessage
   */
  boolean dontProcess() {
    return this.dontProcess;
  }

  /**
   * Indicate that this message should not be processed if we receive CommitProcessMessage (late)
   */
  void setDontProcess() {
    this.dontProcess = true;
  }

  boolean isProcessing() {
    return this.isProcessing;
  }

  private void setIsProcessing(boolean isProcessing) {
    this.isProcessing = isProcessing;
  }

  boolean wasProcessed() {
    return this.wasProcessed;
  }

  void setProcessed(boolean wasProcessed) {
    this.wasProcessed = wasProcessed;
  }

  /**
   * The CommitProcessForLockIDMessaage is sent by the Distributed ACK TX origin to the recipients
   * (aka FarSiders) to indicate that a previously received RegionCommit that contained a lockId
   * should commence processing.
   */
  public static class CommitProcessForLockIdMessage extends CommitProcessMessage {
    private TXLockId lockId;

    public CommitProcessForLockIdMessage() {
      // Zero arg constructor for DataSerializer
    }

    public CommitProcessForLockIdMessage(TXLockId lockId) {
      this.lockId = lockId;
      Assert.assertTrue(this.lockId != null,
          "CommitProcessForLockIdMessage must have a non-null lockid!");
    }

    @Override
    protected void process(ClusterDistributionManager dm) {
      final TXCommitMessage mess = waitForMessage(this.lockId, dm);
      Assert.assertTrue(mess != null, "Commit data for TXLockId: " + this.lockId + " not found");
      basicProcess(mess, dm);
    }

    @Override
    public int getDSFID() {
      return COMMIT_PROCESS_FOR_LOCKID_MESSAGE;
    }

    @Override
    public void toData(DataOutput out,
        SerializationContext context) throws IOException {
      InternalDataSerializer.invokeToData(this.lockId, out);
    }

    @Override
    public void fromData(DataInput in,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      this.lockId = TXLockIdImpl.createFromData(in);
      Assert.assertTrue(this.lockId != null,
          "CommitProcessForLockIdMessage must have a non-null lockid!");
    }

    @Override
    public String toString() {
      StringBuilder result = new StringBuilder(128);
      result.append("CommitProcessForLockIdMessage@").append(System.identityHashCode(this))
          .append(" lockId=").append(this.lockId);
      return result.toString();
    }
  }

  /**
   * The CommitProcessForTXIdMessaage is sent by the Distributed ACK TX origin to the recipients
   * (aka FarSiders) to indicate that a previously received RegionCommit that contained a TXId
   * should commence processing. RegionCommit messages that contain a TXId (and no TXLockId) are
   * typically sent if all the TX changes are a result of load/netsearch/netload values (thus no
   * lockid)
   */
  public static class CommitProcessForTXIdMessage extends CommitProcessMessage {
    private TXId txId;

    public CommitProcessForTXIdMessage() {
      // Zero arg constructor for DataSerializer
    }

    public CommitProcessForTXIdMessage(TXId txId) {
      this.txId = txId;
      Assert.assertTrue(this.txId != null,
          "CommitProcessMessageForTXId must have a non-null txid!");
    }

    @Override
    protected void process(ClusterDistributionManager dm) {
      final TXCommitMessage mess = waitForMessage(this.txId, dm);
      Assert.assertTrue(mess != null, "Commit data for TXId: " + this.txId + " not found");
      basicProcess(mess, dm);
    }

    @Override
    public int getDSFID() {
      return COMMIT_PROCESS_FOR_TXID_MESSAGE;
    }

    @Override
    public void toData(DataOutput out,
        SerializationContext context) throws IOException {
      InternalDataSerializer.invokeToData(this.txId, out);
    }

    @Override
    public void fromData(DataInput in,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      this.txId = TXId.createFromData(in);
      Assert.assertTrue(this.txId != null,
          "CommitProcessMessageForTXId must have a non-null txid!");
    }

    @Override
    public String toString() {
      StringBuilder result = new StringBuilder(128);
      result.append("CommitProcessForTXIdMessage@").append(System.identityHashCode(this))
          .append(" txId=").append(this.txId);
      return result.toString();
    }
  }

  public abstract static class CommitProcessMessage extends PooledDistributionMessage {
    protected void basicProcess(final TXCommitMessage mess, final ClusterDistributionManager dm) {
      dm.removeMembershipListener(mess);
      synchronized (mess) {
        if (mess.dontProcess()) {
          return;
        }
      }
      try {
        mess.basicProcess();
      } finally {
        txTracker.processed(mess);
      }
    }
  }

  /**
   * The CommitProcessQueryMessage is used to attempt to recover - in the Distributed ACK TXs - when
   * the origin of the CommitProcess messages departed from the distributed system. The sender of
   * this message is attempting to query other potential fellow FarSiders (aka recipients) who may
   * have received the CommitProcess message.
   *
   * Since the occurance of this message will be rare (hopefully), it was decided to be general
   * about the the tracker key - opting not to have specific messages for each type like
   * CommitProcessFor<Lock/TX>Id - and take the performance penalty of an extra call to
   * DataSerializer
   */
  public static class CommitProcessQueryMessage extends PooledDistributionMessage {
    private Object trackerKey; // Either a TXLockId or a TXId
    private int processorId;

    public CommitProcessQueryMessage() {
      // Zero arg constructor for DataSerializer
    }

    public CommitProcessQueryMessage(Object trackerKey, int processorId) {
      this.trackerKey = trackerKey;
      this.processorId = processorId;
    }

    @Override
    protected void process(ClusterDistributionManager dm) {
      final boolean processMsgReceived = txTracker.commitProcessReceived(this.trackerKey);
      if (!processMsgReceived) {
        if (logger.isDebugEnabled()) {
          logger.debug("CommitProcessQuery did not find {} in the history", this.trackerKey);
        }
      }

      // Reply to the fellow FarSider as to whether the
      // CommitProcess message was received
      CommitProcessQueryReplyMessage resp = new CommitProcessQueryReplyMessage(processMsgReceived);
      resp.setProcessorId(this.processorId);
      resp.setRecipient(this.getSender());
      dm.putOutgoing(resp);
    }

    @Override
    public void toData(DataOutput out,
        SerializationContext context) throws IOException {
      DataSerializer.writeObject(this.trackerKey, out);
      out.writeInt(this.processorId);
    }

    @Override
    public int getDSFID() {
      return COMMIT_PROCESS_QUERY_MESSAGE;
    }

    @Override
    public void fromData(DataInput in,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      this.trackerKey = DataSerializer.readObject(in);
      this.processorId = in.readInt();
    }

    @Override
    public String toString() {
      StringBuilder result = new StringBuilder(128);
      result.append("CommitProcessQueryMessage@").append(System.identityHashCode(this))
          .append(" trackerKeyClass=").append(this.trackerKey.getClass().getName())
          .append(" trackerKey=").append(this.trackerKey).append(" processorId=")
          .append(this.processorId);
      return result.toString();
    }
  }

  /********************* Commit Process Query Response Message **********************************/
  public static class CommitProcessQueryReplyMessage extends ReplyMessage {
    private boolean wasReceived;

    public CommitProcessQueryReplyMessage(boolean wasReceived) {
      this.wasReceived = wasReceived;
    }

    public CommitProcessQueryReplyMessage() {
      // zero arg constructor for DataSerializer
    }

    public boolean wasReceived() {
      return wasReceived;
    }

    @Override
    public int getDSFID() {
      return COMMIT_PROCESS_QUERY_REPLY_MESSAGE;
    }

    @Override
    public void fromData(DataInput in,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      super.fromData(in, context);
      this.wasReceived = in.readBoolean();
    }

    @Override
    public void toData(DataOutput out,
        SerializationContext context) throws IOException {
      super.toData(out, context);
      out.writeBoolean(this.wasReceived);
    }

    @Override
    public String toString() {
      StringBuilder result = new StringBuilder(128);
      result.append("CommitProcessQueryReplyMessage@").append(System.identityHashCode(this))
          .append(" wasReceived=").append(this.wasReceived).append(" processorId=")
          .append(this.processorId).append(" from ").append(this.getSender());
      return result.toString();
    }
  }

  /********************* Commit Process Query Response Processor *********************************/
  public static class CommitProcessQueryReplyProcessor extends ReplyProcessor21 {
    public boolean receivedOnePositive;

    CommitProcessQueryReplyProcessor(DistributionManager dm, Set members) {
      super(dm, members);
      this.receivedOnePositive = false;
    }

    @Override
    public void process(DistributionMessage msg) {
      CommitProcessQueryReplyMessage ccMess = (CommitProcessQueryReplyMessage) msg;
      if (ccMess.wasReceived()) {
        this.receivedOnePositive = true;
      }
      super.process(msg);
    }

    @Override
    protected boolean canStopWaiting() {
      return this.receivedOnePositive;
    }

    public boolean receivedACommitProcessMessage() {
      return this.receivedOnePositive;
    }
  }

  /********************* MembershipListener Implementation ***************************************/
  @Override
  public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) {
    // do nothing
  }

  @Override
  public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
      InternalDistributedMember whoSuspected, String reason) {}

  @Override
  public void quorumLost(DistributionManager distributionManager,
      Set<InternalDistributedMember> failures,
      List<InternalDistributedMember> remaining) {}

  /**
   * return true if the member initiating this transaction has left the cluster
   */
  public boolean isDepartureNoticed() {
    return departureNoticed;
  }

  private void doOriginDepartedCommit() {
    try {
      // Set processor to zero to avoid the ack to the now departed origin
      processorId = 0;
      basicProcess();
    } finally {
      txTracker.processed(this);
    }
  }

  @Override
  public void memberDeparted(DistributionManager distributionManager,
      final InternalDistributedMember id, boolean crashed) {

    if (!getSender().equals(id)) {
      return;
    }
    distributionManager.removeMembershipListener(this);

    synchronized (this) {
      if (isProcessing() || this.departureNoticed) {
        if (logger.isDebugEnabled()) {
          logger.debug("Member departed: Commit data is already being processed for lockid: {}",
              lockId);
        }
        return;
      }
      this.departureNoticed = true;
    }

    // Send message to fellow FarSiders (aka recipients), if any, to
    // determine if any one of them have received a CommitProcessMessage
    if (getFarSiders() != null && !getFarSiders().isEmpty()) {
      if (logger.isDebugEnabled()) {
        logger.debug(
            "Member departed: {} sending query for CommitProcess message to other recipients.", id);
      }

      // Create a new thread, send the CommitProcessQuery, wait for a response and potentially
      // process
      // Should I use a thread pool?, Darrel suggests look in DM somewhere or introduce a zero
      // sized thread pool
      Thread fellowFarSidersQuery = new LoggingThread("CommitProcessQuery Thread",
          () -> doCommitProcessQuery(id));
      fellowFarSidersQuery.start();
    } else {
      if (logger.isDebugEnabled()) {
        logger.debug("Member departed: {}. Processing commit data.", getSender());
      }

      // Optimimal case where we are the only FarSider, assume we
      // will never get the CommitProcess message, but it
      // doesn't matter since we can commit anyway.
      // Start a new thread to process the commit
      Thread originDepartedCommit = new LoggingThread("Origin Departed Commit",
          this::doOriginDepartedCommit);
      originDepartedCommit.start();
    }
  }

  HashSet getFarSiders() {
    return farSiders;
  }

  DistributionManager getDistributionManager() {
    return dm;
  }

  void doCommitProcessQuery(final InternalDistributedMember id) {
    CommitProcessQueryReplyProcessor replyProcessor = createReplyProcessor();
    CommitProcessQueryMessage queryMessage = createQueryMessage(replyProcessor);
    queryMessage.setRecipients(this.farSiders);
    getDistributionManager().putOutgoing(queryMessage);
    // Wait for any one positive response or all negative responses.
    // (while() loop removed for bug 36983 - you can't loop on waitForReplies()
    getDistributionManager().getCancelCriterion().checkCancelInProgress(null);
    try {
      replyProcessor.waitForRepliesUninterruptibly();
    } catch (ReplyException e) {
      e.handleCause();
    }
    if (replyProcessor.receivedACommitProcessMessage()) {
      if (logger.isDebugEnabled()) {
        logger.debug(
            "Transaction associated with lockID: {} from orign {} is processing due to a received \"commit process\" message",
            lockId, id);
      }

      try {
        // Set processor to zero to avoid the ack to the now departed origin
        processorId = 0;
        basicProcess();
      } finally {
        txTracker.processed(this);
      }
    } else {
      if (logger.isDebugEnabled()) {
        logger.debug(
            "Transaction associated with lockID: {} from origin {} ignored.  No other recipients received \"commit process\" message",
            lockId, id);
      }
      txTracker.removeMessage(this);
    }
  }

  CommitProcessQueryReplyProcessor createReplyProcessor() {
    return new CommitProcessQueryReplyProcessor(dm, farSiders);
  }

  CommitProcessQueryMessage createQueryMessage(CommitProcessQueryReplyProcessor replyProcessor) {
    return new CommitProcessQueryMessage(getTrackerKey(), replyProcessor.getProcessorId());
  }

  private DistributedMember getMemberFromTrackerKey(Object trackerKey) {
    if (trackerKey instanceof TXId) {
      TXId id1 = (TXId) trackerKey;
      return id1.getMemberId();
    } else if (trackerKey instanceof TXLockId) {
      TXLockId id2 = (TXLockId) trackerKey;
      return id2.getMemberId();
    }
    return null;
  }

  void setUpdateLockMembers() {
    this.lockNeedsUpdate = true;
  }

  /**
   * Intended to be called after TXState.applyChanges when the potential for a different set of TX
   * members has been determined and it is safe to ignore any new members because the changes have
   * been applied to committed state. This was added as the solution to bug 32999 and the recovery
   * when the TXLock Lessor (the sending VM) crashes/departs before or while sending the
   * TXCommitMessage.
   *
   * @see TXState#commit()
   * @see org.apache.geode.internal.cache.locks.TXLockBatch#getBatchId()
   */
  private void updateLockMembers() {
    if (this.lockNeedsUpdate && this.lockId != null) {
      TXLockService.createDTLS(this.dm.getSystem()).updateParticipants(this.lockId,
          this.msgMap.keySet());
    }
  }

  /**
   * Reply processor which collects all CommitReplyExceptions and emits a detailed failure exception
   * if problems occur
   *
   * @since GemFire 5.7
   */
  private class CommitReplyProcessor extends ReliableReplyProcessor21 {
    private HashMap msgMap;

    public CommitReplyProcessor(DistributionManager dm, Set initMembers, HashMap msgMap) {
      super(dm, initMembers);
      this.msgMap = msgMap;
    }

    public void waitForCommitCompletion() {
      try {
        waitForRepliesUninterruptibly();
      } catch (CommitExceptionCollectingException e) {
        e.handlePotentialCommitFailure(msgMap);
      }
    }

    @Override
    protected void processException(DistributionMessage msg, ReplyException ex) {
      if (msg instanceof ReplyMessage) {
        synchronized (this) {
          if (this.exception == null) {
            // Exception Container
            this.exception = new CommitExceptionCollectingException(txIdent);
          }
          CommitExceptionCollectingException cce =
              (CommitExceptionCollectingException) this.exception;
          if (ex instanceof CommitReplyException) {
            CommitReplyException cre = (CommitReplyException) ex;
            cce.addExceptionsFromMember(msg.getSender(), cre.getExceptions());
          } else {
            cce.addExceptionsFromMember(msg.getSender(), Collections.singleton(ex));
          }
        }
      }
    }

    @Override
    protected boolean stopBecauseOfExceptions() {
      return false;
    }

    public Set getCacheClosedMembers() {
      if (this.exception != null) {
        CommitExceptionCollectingException cce =
            (CommitExceptionCollectingException) this.exception;
        return cce.getCacheClosedMembers();
      } else {
        return Collections.emptySet();
      }
    }

    public Set getRegionDestroyedMembers(String regionFullPath) {
      if (this.exception != null) {
        CommitExceptionCollectingException cce =
            (CommitExceptionCollectingException) this.exception;
        return cce.getRegionDestroyedMembers(regionFullPath);
      } else {
        return Collections.emptySet();
      }
    }
  }

  /**
   * An Exception that collects many remote CommitExceptions
   *
   * @since GemFire 5.7
   */
  public static class CommitExceptionCollectingException extends ReplyException {
    private static final long serialVersionUID = 589384721273797822L;
    /**
     * Set of members that threw CacheClosedExceptions
     */
    private final Set<InternalDistributedMember> cacheExceptions;
    /**
     * key=region path, value=Set of members
     */
    private final Map<String, Set<InternalDistributedMember>> regionExceptions;
    /**
     * List of exceptions that were unexpected and caused the tx to fail
     */
    private final Map fatalExceptions;

    private final TXId id;

    public CommitExceptionCollectingException(TXId txIdent) {
      this.cacheExceptions = new HashSet();
      this.regionExceptions = new HashMap();
      this.fatalExceptions = new HashMap();
      this.id = txIdent;
    }

    /**
     * Determine if the commit processing was incomplete, if so throw a detailed exception
     * indicating the source of the problem
     */
    public void handlePotentialCommitFailure(
        HashMap<InternalDistributedMember, RegionCommitList> msgMap) {
      if (fatalExceptions.size() > 0) {
        StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
            .append(id).append(".  Caused by the following exceptions: ");
        for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
          Map.Entry me = (Map.Entry) i.next();
          DistributedMember mem = (DistributedMember) me.getKey();
          errorMessage.append(" From member: ").append(mem).append(" ");
          List exceptions = (List) me.getValue();
          for (Iterator ei = exceptions.iterator(); ei.hasNext();) {
            Exception e = (Exception) ei.next();
            errorMessage.append(e);
            for (StackTraceElement ste : e.getStackTrace()) {
              errorMessage.append("\n\tat ").append(ste);
            }
            if (ei.hasNext()) {
              errorMessage.append("\nAND\n");
            }
          }
          errorMessage.append(".");
        }
        throw new CommitIncompleteException(errorMessage.toString());
      }

      // Mark any persistent members as offline
      handleClosedMembers(msgMap);
      handleRegionDestroyed(msgMap);
    }

    /**
     * Mark peers as offline for regions that the peer returned a RegionDestroyedException
     */
    private void handleRegionDestroyed(
        HashMap<InternalDistributedMember, RegionCommitList> msgMap) {
      if (regionExceptions == null || regionExceptions.isEmpty()) {
        return;
      }

      for (Map.Entry<InternalDistributedMember, RegionCommitList> memberMap : msgMap.entrySet()) {
        InternalDistributedMember member = memberMap.getKey();
        RegionCommitList rcl = memberMap.getValue();
        for (RegionCommit region : rcl) {
          Set<InternalDistributedMember> failedMembers =
              regionExceptions.get(region.internalRegion.getFullPath());
          if (failedMembers != null && failedMembers.contains(member)) {
            markMemberOffline(member, region);
          }
        }
      }

    }

    /**
     * Mark peers as offline that returned a cache closed exception
     */
    private void handleClosedMembers(HashMap<InternalDistributedMember, RegionCommitList> msgMap) {
      for (InternalDistributedMember member : getCacheClosedMembers()) {
        RegionCommitList rcl = msgMap.get(member);

        for (RegionCommit region : rcl) {
          markMemberOffline(member, region);
        }
      }
    }

    private void markMemberOffline(InternalDistributedMember member, RegionCommit region) {
      if (region.persistentIds == null) {
        return;
      }

      PersistentMemberID persistentId = region.persistentIds.get(member);
      /// iterate over the list and mark the members offline
      if (persistentId != null) {
        // Fix for bug 42142 - In order for recovery to work,
        // we must either
        // 1) persistent the region operation successfully on the peer
        // 2) record that the peer is offline
        // or
        // 3) fail the operation

        // if we have started to shutdown, we don't want to mark the peer
        // as offline, or we will think we have newer data when in fact we don't
        region.internalRegion.getCancelCriterion().checkCancelInProgress(null);

        // Otherwise, mark the peer as offline, because it didn't complete
        // the operation.
        ((DistributedRegion) region.internalRegion).getPersistenceAdvisor().markMemberOffline(
            member,
            persistentId);
      }
    }

    public Set<InternalDistributedMember> getCacheClosedMembers() {
      return this.cacheExceptions;
    }

    public Set getRegionDestroyedMembers(String regionFullPath) {
      Set members = (Set) this.regionExceptions.get(regionFullPath);
      if (members == null) {
        members = Collections.emptySet();
      }
      return members;
    }

    /**
     * Protected by (this)
     */
    public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
      for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
        Exception ex = (Exception) iter.next();
        if (ex instanceof CancelException) {
          cacheExceptions.add(member);
        } else if (ex instanceof RegionDestroyedException) {
          String r = ((RegionDestroyedException) ex).getRegionFullPath();
          Set<InternalDistributedMember> members = regionExceptions.get(r);
          if (members == null) {
            members = new HashSet();
            regionExceptions.put(r, members);
          }
          members.add(member);
        } else {
          List el = (List) this.fatalExceptions.get(member);
          if (el == null) {
            el = new ArrayList(2);
            this.fatalExceptions.put(member, el);
          }
          el.add(ex);
        }
      }
    }
  }

  public void hookupRegions(DistributionManager dm) {
    if (regions != null) {
      Iterator it = regions.iterator();
      while (it.hasNext()) {
        RegionCommit rc = (RegionCommit) it.next();
        rc.hookupRegion(dm);
      }
    }

  }


  /**
   * Disable firing of TX Listeners. Currently on used on clients.
   *
   * @param b disable the listeners
   */
  public void setDisableListeners(boolean b) {
    disableListeners = true;
  }

  public Version getClientVersion() {
    return clientVersion;
  }

  public void setClientVersion(Version clientVersion) {
    this.clientVersion = clientVersion;
  }

}
