/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.partitioned;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.DataLocationException;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionException;
import org.apache.geode.internal.cache.PrimaryBucketException;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.TransactionMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * The base PartitionedRegion message type upon which other messages should be based.
 *
 * @since GemFire 5.0
 */
public abstract class PartitionMessage extends DistributionMessage
    implements MessageWithReply, TransactionMessage {
  private static final Logger logger = LogService.getLogger();

  /** default exception to ensure a false-positive response is never returned */
  @Immutable
  static final ForceReattemptException UNHANDLED_EXCEPTION =
      (ForceReattemptException) new ForceReattemptException(
          "Unknown exception")
              .fillInStackTrace();

  int regionId;

  int processorId;

  /**
   * whether this message is being sent for listener notification
   */
  boolean notificationOnly;

  protected short flags = 0;

  /*
   * these bit masks are used for encoding the bits of a short on the wire instead of transmitting
   * booleans. Any subclasses interested in saving bits on the wire should add a mask here and then
   * override computeCompressedShort and setBooleans
   *
   */
  /** flag to indicate notification message */
  protected static final short NOTIFICATION_ONLY = DistributionMessage.UNRESERVED_FLAGS_START;
  /** flag to indicate ifNew in PutMessages */
  protected static final short IF_NEW = (NOTIFICATION_ONLY << 1);
  /** flag to indicate ifOld in PutMessages */
  protected static final short IF_OLD = (IF_NEW << 1);
  /** flag to indicate that oldValue is required for PutMessages and others */
  protected static final short REQUIRED_OLD_VAL = (IF_OLD << 1);
  /** flag to indicate filterInfo in message */
  protected static final short HAS_FILTER_INFO = (REQUIRED_OLD_VAL << 1);
  /** flag to indicate delta as value in message */
  protected static final short HAS_DELTA = (HAS_FILTER_INFO << 1);
  /** the unreserved flags start for child classes */
  protected static final short UNRESERVED_FLAGS_START = (HAS_DELTA << 1);

  private InternalDistributedMember txMemberId = null;

  /**
   * The unique transaction Id on the sending member, used to construct a TXId on the receiving side
   */
  private int txUniqId = TXManagerImpl.NOTX;

  protected boolean sendDeltaWithFullValue = true;

  /* TODO [DISTTX] Convert into flag */
  protected boolean isTransactionDistributed = false;

  public PartitionMessage() {}


  public PartitionMessage(InternalDistributedMember recipient, int regionId,
      ReplyProcessor21 processor) {
    Assert.assertTrue(recipient != null, "PartitionMesssage recipient can not be null");
    setRecipient(recipient);
    this.regionId = regionId;
    this.processorId = processor == null ? 0 : processor.getProcessorId();
    if (processor != null && isSevereAlertCompatible()) {
      processor.enableSevereAlertProcessing();
    }
    initTxMemberId();
  }

  public PartitionMessage(Collection<InternalDistributedMember> recipients, int regionId,
      ReplyProcessor21 processor) {
    setRecipients(recipients);
    this.regionId = regionId;
    this.processorId = processor == null ? 0 : processor.getProcessorId();
    if (processor != null && isSevereAlertCompatible()) {
      processor.enableSevereAlertProcessing();
    }
    initTxMemberId();
  }


  public void initTxMemberId() {
    this.txUniqId = TXManagerImpl.getCurrentTXUniqueId();
    TXStateProxy txState = TXManagerImpl.getCurrentTXState();
    if (txState != null) {
      // [DISTTX] Lets not throw this exception for Dist Tx
      if (canStartRemoteTransaction() && txState.isRealDealLocal() && !txState.isDistTx()) {
        // logger.error("sending rmt txId even though tx is local! txState=" + txState, new
        // RuntimeException("STACK");
        throw new IllegalStateException(
            "Sending remote txId even though transaction is local. This should never happen: txState="
                + txState);
      }
      txMemberId = txState.getOriginatingMember();
    }
  }

  /**
   * Copy constructor that initializes the fields declared in this class
   */
  public PartitionMessage(PartitionMessage other) {
    this.regionId = other.regionId;
    this.processorId = other.processorId;
    this.notificationOnly = other.notificationOnly;
    this.txUniqId = other.getTXUniqId();
    this.txMemberId = other.getTXOriginatorClient();
    this.isTransactionDistributed = other.isTransactionDistributed;
  }

  @Override
  public InternalDistributedMember getTXOriginatorClient() {
    return txMemberId;
  }

  @Override
  public InternalDistributedMember getMemberToMasqueradeAs() {
    if (txMemberId == null) {
      return getSender();
    }
    return txMemberId;
  }

  /**
   * Severe alert processing enables suspect processing at the ack-wait-threshold and issuing of a
   * severe alert at the end of the ack-severe-alert-threshold. Some messages should not support
   * this type of processing (e.g., GII, or DLockRequests)
   *
   * @return whether severe-alert processing may be performed on behalf of this message
   */
  @Override
  public boolean isSevereAlertCompatible() {
    return true;
  }

  @Override
  public int getProcessorType() {
    if (this.notificationOnly) {
      return OperationExecutors.SERIAL_EXECUTOR;
    } else {
      return OperationExecutors.PARTITIONED_REGION_EXECUTOR;
    }
  }

  /**
   * @return the compact value that will be sent which represents the PartitionedRegion
   * @see PartitionedRegion#getPRId()
   */
  public int getRegionId() {
    return regionId;
  }

  /**
   * @return the {@link ReplyProcessor21}id associated with the message, null if no acknowlegement
   *         is required.
   */
  @Override
  public int getProcessorId() {
    return this.processorId;
  }

  /**
   * @param processorId1 the {@link org.apache.geode.distributed.internal.ReplyProcessor21} id
   *        associated with the message, null if no acknowlegement is required.
   */
  public void registerProcessor(int processorId1) {
    this.processorId = processorId1;
  }

  /**
   * @return return the message that should be sent to listeners, or null if this message should not
   *         be relayed
   */
  public PartitionMessage getMessageForRelayToListeners(EntryEventImpl event, Set recipients) {
    return null;
  }

  /**
   * check to see if the cache is closing
   */
  public boolean checkCacheClosing(ClusterDistributionManager dm) {
    if (dm == null) {
      return true;
    }
    InternalCache cache = dm.getCache();
    return cache == null || cache.isClosed();
  }

  /**
   * check to see if the distributed system is closing
   *
   * @return true if the distributed system is closing
   */
  public boolean checkDSClosing(ClusterDistributionManager dm) {
    InternalDistributedSystem ds = dm.getSystem();
    return (ds == null || ds.isDisconnecting());
  }

  PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException {
    return PartitionedRegion.getPRFromId(this.regionId);
  }

  TXManagerImpl getTXManagerImpl(InternalCache cache) {
    return cache.getTxManager();
  }

  long getStartPartitionMessageProcessingTime(PartitionedRegion pr) {
    return pr.getPrStats().startPartitionMessageProcessing();
  }


  /**
   * Upon receipt of the message, both process the message and send an acknowledgement, not
   * necessarily in that order. Note: Any hang in this message may cause a distributed deadlock for
   * those threads waiting for an acknowledgement.
   *
   * @throws PartitionedRegionException if the region does not exist (typically, if it has been
   *         destroyed)
   */
  @Override
  public void process(final ClusterDistributionManager dm) {
    Throwable thr = null;
    boolean sendReply = true;
    PartitionedRegion pr = null;
    long startTime = 0;
    EntryLogger.setSource(getSender(), "PR");
    try {
      InternalCache cache = dm.getCache();
      if (checkCacheClosing(dm) || checkDSClosing(dm)) {
        if (cache != null) {
          thr = cache
              .getCacheClosedException(String.format("Remote cache is closed: %s",
                  dm.getId()));
        } else {
          thr = new CacheClosedException(String.format("Remote cache is closed: %s",
              dm.getId()));
        }
        return;
      }
      pr = getPartitionedRegion();
      Throwable forcedReattempt = processCheckForPR(pr, dm);
      if (forcedReattempt != null) {
        thr = forcedReattempt;
        return;
      }

      if (pr != null) {
        startTime = getStartPartitionMessageProcessingTime(pr);
      }
      thr = UNHANDLED_EXCEPTION;

      if (cache == null) {
        throw new ForceReattemptException("Remote cache is closed");
      }
      TXManagerImpl txMgr = getTXManagerImpl(cache);
      TXStateProxy tx = txMgr.masqueradeAs(this);
      if (tx == null) {
        sendReply = operateOnPartitionedRegion(dm, pr, startTime);
      } else {
        try {
          if (txMgr.isClosed()) {
            // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
            sendReply = false;
          } else if (tx.isInProgress()) {
            sendReply = operateOnPartitionedRegion(dm, pr, startTime);
            tx.updateProxyServer(this.getSender());
          } else {
            /*
             * This can occur when processing an in-flight message after the transaction has
             * been failed over and committed.
             */
            throw new TransactionException("transactional operation elided because transaction {"
                + tx.getTxId() + "} is closed");
          }
        } finally {
          txMgr.unmasquerade(tx);
        }
      }
      thr = null;

    } catch (ForceReattemptException | TransactionException e) {
      thr = e;
    } catch (DataLocationException fre) {
      thr = new ForceReattemptException(fre.getMessage(), fre);
    } catch (DistributedSystemDisconnectedException se) {
      // bug 37026: this is too noisy...
      // throw new CacheClosedException("remote system shutting down");
      // thr = se; cache is closed, no point trying to send a reply
      thr = null;
      sendReply = false;
      if (logger.isDebugEnabled()) {
        logger.debug("shutdown caught, abandoning message: {}", se.getMessage(), se);
      }
    } catch (RegionDestroyedException | RegionNotFoundException rde) {
      // [bruce] RDE does not always mean that the sender's region is also
      // destroyed, so we must send back an exception. If the sender's
      // region is also destroyed, who cares if we send it an exception
      // if (pr != null && pr.isClosed) {
      thr = new ForceReattemptException(String.format("Region is destroyed in %s",
          dm.getDistributionManagerId()), rde);
      // }
    } catch (VirtualMachineError err) {
      SystemFailure.initiateFailure(err);
      // If this ever returns, rethrow the error. We're poisoned
      // now, so don't let this thread continue.
      throw err;
    } catch (Throwable t) {
      // Whenever you catch Error or Throwable, you must also
      // catch VirtualMachineError (see above). However, there is
      // _still_ a possibility that you are dealing with a cascading
      // error condition, so you also need to check to see if the JVM
      // is still usable:
      SystemFailure.checkFailure();
      // log the exception at fine level if there is no reply to the message
      thr = null;
      if (sendReply) {
        if (!checkDSClosing(dm)) {
          thr = t;
        } else {
          // don't pass arbitrary runtime exceptions and errors back if this
          // cache/vm is closing
          thr = new ForceReattemptException(
              "Distributed system is disconnecting");
        }
      }
      if (logger.isTraceEnabled(LogMarker.DM_VERBOSE) && t instanceof RuntimeException) {
        logger.trace(LogMarker.DM_VERBOSE, "Exception caught while processing message: {}",
            t.getMessage(), t);
      }
    } finally {
      if (sendReply) {
        ReplyException rex = null;

        if (thr != null) {
          // don't transmit the exception if this message was to a listener
          // and this listener is shutting down
          boolean excludeException = this.notificationOnly
              && ((thr instanceof CancelException) || (thr instanceof ForceReattemptException));

          if (!excludeException) {
            rex = new ReplyException(thr);
          }
        }

        // Send the reply if the operateOnPartitionedRegion returned true
        sendReply(getSender(), this.processorId, dm, rex, pr, startTime);
        EntryLogger.clearSource();
      }
    }
  }

  /**
   * If the PR is missing or isn't ready for use we may want to return a
   * ForceReattemptException to have the sender retry after a bit
   */
  protected Throwable processCheckForPR(PartitionedRegion pr,
      DistributionManager distributionManager) {
    if ((pr == null || !pr.getDistributionAdvisor().isInitialized()) && failIfRegionMissing()) {
      // if the distributed system is disconnecting, don't send a reply saying
      // the partitioned region can't be found (bug 36585)
      Throwable thr = new ForceReattemptException(
          String.format("%s : could not find partitioned region with Id %s",
              distributionManager.getDistributionManagerId(), regionId));
      return thr; // reply sent in finally block below
    }
    return null;
  }

  /**
   * Send a generic ReplyMessage. This is in a method so that subclasses can override the reply
   * message type
   *
   * @param pr the Partitioned Region for the message whose statistics are incremented
   * @param startTime the start time of the operation in nanoseconds
   * @see PutMessage#sendReply
   */
  protected void sendReply(InternalDistributedMember member, int procId, DistributionManager dm,
      ReplyException ex, PartitionedRegion pr, long startTime) {
    if (pr != null && startTime > 0) {
      pr.getPrStats().endPartitionMessagesProcessing(startTime);
    }

    ReplyMessage.send(member, procId, ex, getReplySender(dm), pr != null && pr.isInternalRegion());
  }

  /**
   * Allow classes that over-ride to choose whether a RegionDestroyException is thrown if no
   * partitioned region is found (typically occurs if the message will be sent before the
   * PartitionedRegion has been fully constructed.
   *
   * @return true if throwing a {@link RegionDestroyedException} is acceptable
   */
  protected boolean failIfRegionMissing() {
    return true;
  }

  /**
   * relay this message to another set of recipients for event notification
   *
   * @param cacheOpRecipients recipients of associated bucket CacheOperationMessage
   * @param adjunctRecipients recipients who unconditionally get the message
   * @param filterRoutingInfo routing information for all recipients
   * @param event the event causing this message
   * @param r the region being operated on
   * @param processor the reply processor to be notified
   */
  public Set relayToListeners(Set cacheOpRecipients, Set adjunctRecipients,
      FilterRoutingInfo filterRoutingInfo, EntryEventImpl event, PartitionedRegion r,
      DirectReplyProcessor processor) {
    this.processorId = processor == null ? 0 : processor.getProcessorId();
    this.notificationOnly = true;

    this.setFilterInfo(filterRoutingInfo);
    Set failures1 = null;
    if (!adjunctRecipients.isEmpty()) {
      if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
        logger.trace(LogMarker.DM_VERBOSE,
            "Relaying partition message to other processes for listener notification");
      }
      resetRecipients();
      setRecipients(adjunctRecipients);
      failures1 = r.getDistributionManager().putOutgoing(this);
    }

    return failures1;
  }

  /**
   * return a new reply processor for this class, for use in relaying a response. This <b>must</b>
   * be an instance method so subclasses can override it properly.
   */
  PartitionResponse createReplyProcessor(PartitionedRegion r, Set recipients) {
    return new PartitionResponse(r.getSystem(), recipients);
  }


  protected boolean operateOnRegion(ClusterDistributionManager dm, PartitionedRegion pr) {
    throw new InternalGemFireError(
        "Sorry, use operateOnPartitionedRegion for PR messages");
  }

  /**
   * An operation upon the messages partitioned region which each subclassing message must implement
   *
   * @param dm the manager that received the message
   * @param pr the partitioned region that should be modified
   * @param startTime the start time of the operation
   * @return true if a reply message should be sent
   * @throws CacheException if an error is generated in the remote cache
   * @throws DataLocationException if the peer is no longer available
   */
  protected abstract boolean operateOnPartitionedRegion(ClusterDistributionManager dm,
      PartitionedRegion pr, long startTime) throws CacheException, QueryException,
      DataLocationException, InterruptedException, IOException;

  /**
   * Fill out this instance of the message using the <code>DataInput</code> Required to be a
   * {@link org.apache.geode.DataSerializable}Note: must be symmetric with
   * {@link DataSerializableFixedID#toData(DataOutput, SerializationContext)}in what it reads
   */
  @Override
  public void fromData(DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    super.fromData(in, context);
    this.flags = in.readShort();
    setBooleans(this.flags, in, context);
    this.regionId = in.readInt();
    // extra field post 9.0
    if (InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.GFE_90) >= 0) {
      this.isTransactionDistributed = in.readBoolean();
    }
  }

  /**
   * Re-construct the booleans using the compressed short. A subclass must override this method if
   * it is using bits in the compressed short.
   */
  protected void setBooleans(short s, DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    if ((s & HAS_PROCESSOR_ID) != 0) {
      this.processorId = in.readInt();
      ReplyProcessor21.setMessageRPId(this.processorId);
    }
    if ((s & NOTIFICATION_ONLY) != 0)
      this.notificationOnly = true;
    if ((s & HAS_TX_ID) != 0)
      this.txUniqId = in.readInt();
    if ((s & HAS_TX_MEMBERID) != 0) {
      this.txMemberId = context.getDeserializer().readObject(in);
    }
  }

  /**
   * Send the contents of this instance to the DataOutput Required to be a
   * {@link org.apache.geode.DataSerializable}Note: must be symmetric with
   * {@link DataSerializableFixedID#fromData(DataInput, DeserializationContext)}in what it writes
   */
  @Override
  public void toData(DataOutput out,
      SerializationContext context) throws IOException {
    super.toData(out, context);
    short compressedShort = 0;
    compressedShort = computeCompressedShort(compressedShort);
    out.writeShort(compressedShort);
    if (this.processorId != 0)
      out.writeInt(this.processorId);
    if (this.txUniqId != TXManagerImpl.NOTX)
      out.writeInt(this.txUniqId);
    if (this.txMemberId != null)
      context.getSerializer().writeObject(this.txMemberId, out);
    out.writeInt(this.regionId);
    // extra field post 9.0
    if (InternalDataSerializer.getVersionForDataStream(out).compareTo(Version.GFE_90) >= 0) {
      out.writeBoolean(this.isTransactionDistributed);
    }
  }

  /**
   * Sets the bits of a short by using the bit masks. A subclass must override this method if it is
   * using bits in the compressed short.
   *
   * @return short with appropriate bits set
   */
  protected short computeCompressedShort(short s) {
    if (this.processorId != 0)
      s |= HAS_PROCESSOR_ID;
    if (this.notificationOnly)
      s |= NOTIFICATION_ONLY;
    if (this.getTXUniqId() != TXManagerImpl.NOTX) {
      s |= HAS_TX_ID;
      if (this.txMemberId != null) {
        s |= HAS_TX_MEMBERID;
      }
    }
    return s;
  }

  public static final String PN_TOKEN = ".cache.";

  @Override
  public String toString() {
    StringBuilder buff = new StringBuilder();
    String className = getClass().getName();
    // className.substring(className.lastIndexOf('.', className.lastIndexOf('.') - 1) + 1); //
    // partition.<foo> more generic version
    buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo>
    buff.append("(prid="); // make sure this is the first one
    buff.append(this.regionId);

    // Append name, if we have it
    String name = null;
    try {
      PartitionedRegion pr = PartitionedRegion.getPRFromId(this.regionId);
      if (pr != null) {
        name = pr.getFullPath();
      }
    } catch (Exception ignore) {
      /* ignored */
      name = null;
    }
    if (name != null) {
      buff.append(" (name = \"").append(name).append("\")");
    }

    appendFields(buff);
    buff.append(" ,distTx=");
    buff.append(this.isTransactionDistributed);
    buff.append(")");
    return buff.toString();
  }

  /**
   * Helper class of {@link #toString()}
   *
   * @param buff buffer in which to append the state of this instance
   */
  protected void appendFields(StringBuilder buff) {
    buff.append(" processorId=").append(this.processorId);
    if (this.notificationOnly) {
      buff.append(" notificationOnly=").append(this.notificationOnly);
    }
    if (this.txUniqId != TXManagerImpl.NOTX) {
      buff.append(" txId=").append(this.txUniqId);
    }
    if (this.txMemberId != null) {
      buff.append(" txMemberId=").append(this.txMemberId);
    }
  }

  public InternalDistributedMember getRecipient() {
    return getRecipients()[0];
  }

  public void setOperation(Operation op) {
    // override in subclasses holding operations
  }

  /**
   * added to support old value to be written on wire.
   *
   * @param value true or false
   * @since GemFire 5.5
   */
  public void setHasOldValue(boolean value) {
    // override in subclasses which need old value to be serialized.
    // overridden by classes like PutMessage, DestroyMessage.
  }

  /**
   * added to support routing of notification-only messages to clients
   */
  public void setFilterInfo(FilterRoutingInfo filterInfo) {
    // subclasses that support routing to clients should reimplement this method
  }

  /**
   * @return the txUniqId
   */
  @Override
  public int getTXUniqId() {
    return txUniqId;
  }

  @Override
  public boolean canStartRemoteTransaction() {
    return false;
  }

  public void setSendDeltaWithFullValue(boolean bool) {
    this.sendDeltaWithFullValue = bool;
  }

  @Override
  public boolean canParticipateInTransaction() {
    return true;
  }

  protected boolean notifiesSerialGatewaySender(ClusterDistributionManager dm) {
    try {
      PartitionedRegion pr = PartitionedRegion.getPRFromId(this.regionId);
      if (pr == null) {
        return false;
      }
      return pr.notifiesSerialGatewaySender();
    } catch (PRLocallyDestroyedException ignore) {
      return false;
    } catch (RuntimeException ignore) {
      return false;
    }
  }

  /**
   * A processor on which to await a response from the {@link PartitionMessage} recipient, capturing
   * any CacheException thrown by the recipient and handle it as an expected exception.
   *
   * @since GemFire 5.0
   * @see #waitForCacheException()
   */
  public static class PartitionResponse extends DirectReplyProcessor {
    /**
     * The exception thrown when the recipient does not reply
     */
    volatile ForceReattemptException prce;

    /**
     * Whether a response has been received
     */
    volatile boolean responseReceived;

    /**
     * whether a response is required
     */
    boolean responseRequired;

    public PartitionResponse(InternalDistributedSystem dm, Set initMembers) {
      this(dm, initMembers, true);
    }

    public PartitionResponse(InternalDistributedSystem dm, Set initMembers, boolean register) {
      super(dm, initMembers);
      if (register) {
        register();
      }
    }

    public PartitionResponse(InternalDistributedSystem dm, InternalDistributedMember member) {
      this(dm, member, true);
    }

    public PartitionResponse(InternalDistributedSystem dm, InternalDistributedMember member,
        boolean register) {
      super(dm, member);
      if (register) {
        register();
      }
    }

    /**
     * require a response message to be received
     */
    public void requireResponse() {
      this.responseRequired = true;
    }

    @Override
    public void memberDeparted(DistributionManager distributionManager,
        final InternalDistributedMember id, final boolean crashed) {
      if (id != null) {
        if (removeMember(id, true)) {
          this.prce = new ForceReattemptException(
              String.format("memberDeparted event for < %s > crashed, %s",
                  id, crashed));
        }
        checkIfDone();
      } else {
        Exception e = new Exception(
            "memberDeparted got null memberId");
        logger.info(String.format("memberDeparted got null memberId crashed=%s", crashed),
            e);
      }
    }

    /**
     * Waits for the response from the {@link PartitionMessage}'s recipient
     *
     * @throws CacheException if the recipient threw a cache exception during message processing
     * @throws ForceReattemptException if the recipient left the distributed system before the
     *         response was received.
     */
    public void waitForCacheException()
        throws CacheException, ForceReattemptException, PrimaryBucketException {
      try {
        waitForRepliesUninterruptibly();
        if (this.prce != null || (this.responseRequired && !this.responseReceived)) {
          throw new ForceReattemptException(
              "Attempt failed", this.prce);
        }
      } catch (ReplyException e) {
        Throwable t = e.getCause();
        if (t instanceof CacheException) {
          throw (CacheException) t;
        } else if (t instanceof ForceReattemptException) {
          ForceReattemptException ft = (ForceReattemptException) t;
          // See FetchEntriesMessage, which can marshal a ForceReattempt
          // across to the sender
          ForceReattemptException fre = new ForceReattemptException(
              "Peer requests reattempt", t);
          if (ft.hasHash()) {
            fre.setHash(ft.getHash());
          }
          throw fre;
        } else if (t instanceof PrimaryBucketException) {
          // See FetchEntryMessage, GetMessage, InvalidateMessage,
          // PutMessage
          // which can marshal a ForceReattemptacross to the sender
          throw new PrimaryBucketException(
              "Peer failed primary test", t);
        } else if (t instanceof CancelException) {
          logger.debug(
              "PartitionResponse got CacheClosedException from {}, throwing ForceReattemptException",
              e.getSender(), t);
          throw new ForceReattemptException(
              "PartitionResponse got remote CacheClosedException",
              t);
        } else if (t instanceof DiskAccessException) {
          logger.debug(
              "PartitionResponse got DiskAccessException from {}, throwing ForceReattemptException",
              e.getSender(), t);
          throw new ForceReattemptException(
              "PartitionResponse got remote CacheClosedException",
              t);
        } else if (t instanceof LowMemoryException) {
          logger.debug("PartitionResponse re-throwing remote LowMemoryException from {}",
              e.getSender(), t);
          throw (LowMemoryException) t;
        }
        e.handleCause();
      }
    }

    /* overridden from ReplyProcessor21 */
    @Override
    public void process(DistributionMessage msg) {
      this.responseReceived = true;
      super.process(msg);
    }
  }

  @Override
  public boolean isTransactionDistributed() {
    return this.isTransactionDistributed;
  }

  /*
   * For Distributed Tx
   */
  public void setTransactionDistributed(boolean isDistTx) {
    this.isTransactionDistributed = isDistTx;
  }
}
