/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.SerialDistributionMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * This operation ensures that a particular member has seen all state changes for a Region prior to
 * a point in time. Currently this is fixed at the time the member using this operation exchanged
 * profiles with other users of the Region, and is useful only for ensuring consistency for
 * InitialImageOperation.
 *
 * StateFlushOperation works with distribution advisors and with the membership manager to flush
 * cache operations from threads to communications channels and then from the communications
 * channels to the cache of the member selected to be an initial image provider.
 *
 * To make an operation subject to StateFlushOperation you must encapsulate the message part of the
 * operation (prior to asking for distribution advice) in a try/finally block. The try/finally block
 * must work with the distribution manager like this:
 *
 * <pre>
 * try {
 *   long version = advisor.startOperation();
 *   ... get advice and write the message (dm.putOutgoing())
 *   advisor.endOperation(version);
 *   version = -1;
 *   ... wait for replies, etc.
 * } finally {
 *   if (version >= 0) {
 *     advisor.endOperation(version);
 *   }
 * }
 * </pre>
 *
 * On the receiving side the messaging system will look at the result of invoking
 * containsCacheContentChange() on the message. If the message does not return true from this
 * message then state-flush will not wait for it to be applied to the cache before GII starts.
 *
 * <pre>
 * \@Override
 * public boolean containsCacheContentChange() {
 *   return true;
 * }
 * </pre>
 *
 * The messaging infrastructure will handle the rest for you. For examples look at the uses of
 * startOperation() and endOperation(). There are some complex examples in transaction processing
 * and a more straightforward example in DistributedCacheOperation.
 *
 * @since GemFire 5.0.1
 */
public class StateFlushOperation {

  private static final Logger logger = LogService.getLogger();

  private DistributedRegion region;

  private DistributionManager dm;

  /** flush current ops to the given members for the given region */
  public static void flushTo(Set<InternalDistributedMember> targets, DistributedRegion region) {
    DistributionManager dm = region.getDistributionManager();
    boolean initialized = region.isInitialized();
    if (initialized) {
      // force a new "view" so we can track current ops
      region.getDistributionAdvisor().forceNewMembershipVersion();
      try {
        region.getDistributionAdvisor().waitForCurrentOperations();
      } catch (RegionDestroyedException ignore) {
        return;
      }
    }
    // send all state-flush messages and then wait for replies
    Set<ReplyProcessor21> processors = new HashSet<ReplyProcessor21>();
    for (InternalDistributedMember target : targets) {
      StateStabilizationMessage gr = new StateStabilizationMessage();
      gr.isSingleFlushTo = true; // new for flushTo operation
      gr.requestingMember = dm.getDistributionManagerId();
      gr.setRecipient(target);
      ReplyProcessor21 processor = new ReplyProcessor21(dm, target);
      gr.processorId = processor.getProcessorId();
      gr.channelState = dm.getDistribution().getMessageState(target, false);
      if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)
          && ((gr.channelState != null) && (gr.channelState.size() > 0))) {
        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "channel states: {}",
            gr.channelStateDescription(gr.channelState));
      }
      if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", gr);
      }
      dm.putOutgoing(gr);
      processors.add(processor);
    }

    if (region.getRegionMap().getARMLockTestHook() != null) {
      region.getRegionMap().getARMLockTestHook().beforeStateFlushWait();
    }

    for (ReplyProcessor21 processor : processors) {
      try {
        processor.waitForReplies();
      } catch (InterruptedException ignore) {
        Thread.currentThread().interrupt();
        return;
      }
    }
  }

  /**
   * Constructor for StateFlushOperation
   *
   * @param r The region whose state is to be flushed
   */
  public StateFlushOperation(DistributedRegion r) {
    this.region = r;
    this.dm = r.getDistributionManager();
  }

  /**
   * Constructor for StateFlushOperation for flushing all regions
   *
   * @param dm the distribution manager to use in distributing the operation
   */
  public StateFlushOperation(DistributionManager dm) {
    this.dm = dm;
  }


  /**
   * flush state to the given target
   *
   * @param recipients The members who may be making state changes to the region. This is typically
   *        taken from a CacheDistributionAdvisor membership set
   * @param target The member who should have all state flushed to it
   * @param processorType The execution processor type for the marker message that is sent to all
   *        members using the given region
   * @param flushNewOps normally only ops that were started before region profile exchange are
   *        flushed. Setting this to true causes the flush to wait for any started after the profile
   *        exchange as well.
   * @throws InterruptedException If the operation is interrupted, usually for shutdown, an
   *         InterruptedException will be thrown
   * @return true if the state was flushed, false if not
   */
  public boolean flush(Set recipients, DistributedMember target, int processorType,
      boolean flushNewOps) throws InterruptedException {

    Set recips = recipients; // do not use recipients parameter past this point
    if (Thread.interrupted()) {
      throw new InterruptedException();
    }

    InternalDistributedMember myId = this.dm.getDistributionManagerId();

    if (!recips.contains(target) && !myId.equals(target)) {
      recips = new HashSet(recipients);
      recips.add(target);
    }
    // partial fix for bug 38773 - ensures that this cache will get both
    // a cache op and an adjunct message when creating a bucket region
    // if (recips.size() < 2 && !myId.equals(target)) {
    // return true; // no state to flush to a single holder of the region
    // }
    StateMarkerMessage smm = new StateMarkerMessage();
    smm.relayRecipient = target;
    smm.processorType = processorType;
    smm.flushNewOps = flushNewOps;
    if (region == null) {
      smm.allRegions = true;
    } else {
      smm.regionPath = region.getFullPath();
    }
    smm.setRecipients(recips);

    StateFlushReplyProcessor gfprocessor = new StateFlushReplyProcessor(dm, recips, target);
    smm.processorId = gfprocessor.getProcessorId();
    if (region != null && region.isUsedForPartitionedRegionBucket()
        && region.getDistributionConfig().getAckSevereAlertThreshold() > 0) {
      smm.severeAlertEnabled = true;
      gfprocessor.enableSevereAlertProcessing();
    }
    if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
      logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {} with processor {}", smm,
          gfprocessor);
    }
    Set failures = this.dm.putOutgoing(smm);
    if (failures != null) {
      if (failures.contains(target)) {
        if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
          logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
              "failed to send StateMarkerMessage to target {}; returning from flush without waiting for replies",
              target);
        }
        return false;
      }
      gfprocessor.messageNotSentTo(failures);
    }

    try {
      gfprocessor.waitForReplies();
      if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Finished processing {}", smm);
      }
    } catch (ReplyException re) {
      logger.warn("state flush terminated with exception", re);
      return false;
    }
    return true;
  }

  /**
   * This message is sent, e.g., before requesting an initial image from a single provider. It is
   * sent to all members holding the region, and has the effect of causing those members to send a
   * serial distribution message (a StateStabilizationMessage) to the image provider. The provider
   * then sends a reply message back to this process on behalf of the member receiving the .
   *
   * <pre>
   * requestor ----> member1 --StateStabilizationMessage--> provider --StateStabilizedMessage--> requestor
   *           ----> member2 --StateStabilizationMessage--> provider --StateStabilizedMessage--> requestor
   *           ----> provider --StateStabilizedMessage--> requestor
   * </pre>
   *
   * This flushes the ordered messages in flight between members and the gii provider, so we don't
   * miss data when the image is requested.
   *
   * @since GemFire 5.0.1
   * @see StateFlushOperation.StateStabilizationMessage
   * @see StateFlushOperation.StateStabilizedMessage
   *
   */
  public static class StateMarkerMessage extends DistributionMessage implements MessageWithReply {
    /** roll the membership version to force flushing of new ops */
    public boolean flushNewOps;
    /** the member acting as the relay point */
    protected DistributedMember relayRecipient;
    /** the reply processor identity */
    protected int processorId;
    /** the type of executor to use */
    protected int processorType;
    /** the target region's full path */
    protected String regionPath;
    /** the associated Region */
    protected DistributedRegion region;
    /** whether to enable severe alert processing */
    protected transient boolean severeAlertEnabled;
    /**
     * whether all regions must be flushed to the relay target. If this is true, then regionPath may
     * be null.
     */
    protected boolean allRegions;

    public StateMarkerMessage() {
      super();
    }

    @Override
    public int getProcessorId() {
      return this.processorId;
    }

    @Override
    public int getProcessorType() {
      return processorType;
    }

    private DistributedRegion getRegion(ClusterDistributionManager dm) {
      if (region != null) {
        return region;
      }
      final InitializationLevel oldLevel =
          LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
      try {
        InternalCache gfc = dm.getExistingCache();
        Region r = gfc.getRegionByPathForProcessing(this.regionPath);
        if (r instanceof DistributedRegion) {
          region = (DistributedRegion) r;
        }
      } finally {
        LocalRegion.setThreadInitLevelRequirement(oldLevel);
      }
      return region;
    }

    /** returns a set of all DistributedRegions for allRegions processing */
    private Set<DistributedRegion> getAllRegions(ClusterDistributionManager dm) {
      final InitializationLevel oldLevel =
          LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
      try {
        InternalCache cache = dm.getExistingCache();
        Set<DistributedRegion> result = new HashSet();
        for (InternalRegion r : cache.getAllRegions()) {
          // it's important not to check if the cache is closing, so access
          // the isDestroyed boolean directly
          if (r instanceof DistributedRegion && !((LocalRegion) r).isDestroyed) {
            result.add((DistributedRegion) r);
          }
        }
        return result;
      } finally {
        LocalRegion.setThreadInitLevelRequirement(oldLevel);
      }
    }

    @Override
    protected void process(ClusterDistributionManager dm) {
      logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Processing {}", this);
      if (dm.getDistributionManagerId().equals(relayRecipient)) {
        try {
          // wait for inflight operations to the aeqs even if the recipient is the primary
          Set<DistributedRegion> regions = getRegions(dm);
          for (DistributedRegion r : regions) {
            if (r != null) {
              if (this.allRegions && r.doesNotDistribute()) {
                // no need to flush a region that does no distribution
                continue;
              }
              waitForCurrentOperations(r, r.isInitialized());
            }
          }
        } catch (CancelException ignore) {
          // cache is closed - no distribution advisor available for the region so nothing to do but
          // send the stabilization message
        } catch (Exception e) {
          logger.fatal(String.format("%s Exception caught while determining channel state",
              this),
              e);
        } finally {
          // no need to send a relay request to this process - just send the
          // ack back to the sender
          StateStabilizedMessage ga = new StateStabilizedMessage();
          ga.sendingMember = relayRecipient;
          ga.setRecipient(this.getSender());
          ga.setProcessorId(processorId);
          if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
            logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", ga);
          }
          dm.putOutgoing(ga);
        }
      } else {
        // 1) wait for all messages based on the membership version (or older)
        // at which the sender "joined" this region to be put on the pipe
        // 2) record the state of all communication channels from this process
        // to the relay point
        // 3) send a stabilization message to the relay point that holds the
        // communication channel state information
        StateStabilizationMessage gr = new StateStabilizationMessage();
        gr.setRecipient((InternalDistributedMember) relayRecipient);
        gr.requestingMember = this.getSender();
        gr.processorId = processorId;
        try {
          Set<DistributedRegion> regions = getRegions(dm);
          for (DistributedRegion r : regions) {
            if (r == null) {
              if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
                logger.trace(LogMarker.DM_VERBOSE,
                    "Region not found - skipping channel state assessment");
              }
            }
            if (r != null) {
              if (this.allRegions && r.doesNotDistribute()) {
                // no need to flush a region that does no distribution
                continue;
              }
              boolean initialized = r.isInitialized();
              waitForCurrentOperations(r, initialized);
              boolean useMulticast =
                  r.getMulticastEnabled() && r.getSystem().getConfig().getMcastPort() != 0;
              if (initialized) {
                Map channelStates =
                    dm.getDistribution().getMessageState(relayRecipient, useMulticast);
                if (gr.channelState != null) {
                  gr.channelState.putAll(channelStates);
                } else {
                  gr.channelState = channelStates;
                }
                if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)
                    && ((gr.channelState != null) && (gr.channelState.size() > 0))) {
                  logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "channel states: {}",
                      gr.channelStateDescription(gr.channelState));
                }
              }
            }
          }
        } catch (CancelException ignore) {
          // cache is closed - no distribution advisor available for the region so nothing to do but
          // send the stabilization message
        } catch (Exception e) {
          logger.fatal(String.format("%s Exception caught while determining channel state",
              this),
              e);
        } finally {
          if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
            logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", gr);
          }
          dm.putOutgoing(gr);
        }
      }
    }

    private void waitForCurrentOperations(final DistributedRegion r, final boolean initialized) {
      if (initialized) {
        if (this.flushNewOps) {
          r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new "view" so
          // we can track current
          // ops
        }
        try {
          r.getDistributionAdvisor().waitForCurrentOperations();
        } catch (RegionDestroyedException e) {
          // continue with the next region
        }
      }
    }

    private Set<DistributedRegion> getRegions(final ClusterDistributionManager dm) {
      Set<DistributedRegion> regions;
      if (this.allRegions) {
        regions = getAllRegions(dm);
      } else {
        regions = Collections.singleton(this.getRegion(dm));
      }
      return regions;
    }

    @Override
    public void toData(DataOutput dout,
        SerializationContext context) throws IOException {
      super.toData(dout, context);
      DataSerializer.writeObject(relayRecipient, dout);
      dout.writeInt(processorId);
      dout.writeInt(processorType);
      dout.writeBoolean(allRegions);
      if (!allRegions) {
        DataSerializer.writeString(regionPath, dout);
      }
    }

    @Override
    public int getDSFID() {
      return STATE_MARKER_MESSAGE;
    }

    @Override
    public void fromData(DataInput din,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      super.fromData(din, context);
      relayRecipient = DataSerializer.readObject(din);
      processorId = din.readInt();
      processorType = din.readInt();
      allRegions = din.readBoolean();
      if (!allRegions) {
        regionPath = DataSerializer.readString(din);
      }
    }

    @Override
    public String toString() {
      return "StateMarkerMessage(requestingMember=" + this.getSender() + ",processorId="
          + processorId + ",target=" + relayRecipient + ",region=" + regionPath + ")";
    }

    @Override
    public boolean isSevereAlertCompatible() {
      return severeAlertEnabled;
    }


  }

  /**
   * StateStabilizationMessage is sent by a distributed member to a member who is the target of a
   * state flush. The target then sends a StateStabilizedMessage to the sender of the
   * StateStabilizationMessage when all state has been flushed to it.
   * <p>
   * author bruce
   *
   * @see StateFlushOperation.StateStabilizedMessage
   * @see StateFlushOperation.StateMarkerMessage
   * @since GemFire 5.0.1
   */
  public static class StateStabilizationMessage extends SerialDistributionMessage {
    /** the member that requested StateStabilizedMessages */
    protected DistributedMember requestingMember;
    /** the processor id for the requesting member */
    protected int processorId;
    /**
     * a map of the communication channel state between the sending process and the receiving
     * process
     */
    protected Map channelState;
    /** whether this is a simple request/response two-party flush or (false) a proxied flush */
    protected boolean isSingleFlushTo;

    public StateStabilizationMessage() {
      super();
    }

    public String channelStateDescription(Object state) {
      if (!(state instanceof Map)) {
        return "unknown channelState content";
      } else {
        Map csmap = (Map) state;
        StringBuilder result = new StringBuilder(200);
        for (Iterator it = csmap.entrySet().iterator(); it.hasNext();) {
          Map.Entry entry = (Map.Entry) it.next();
          result.append(entry.getKey()).append('=').append(entry.getValue());
          if (it.hasNext()) {
            result.append(", ");
          }
        }
        return result.toString();
      }
    }

    @Override
    protected void process(final ClusterDistributionManager dm) {
      // though this message must be transmitted on an ordered connection to
      // ensure that datagram channnels are flushed, we need to execute
      // in the waiting pool to avoid blocking those connections
      dm.getExecutors().getWaitingThreadPool().execute(new Runnable() {
        @Override
        public void run() {
          if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
            logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Processing {}", this);
          }
          try {
            if (channelState != null) {
              if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)
                  && (channelState.size() > 0)) {
                logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Waiting for channel states:  {}",
                    channelStateDescription(channelState));
              }
              for (;;) {
                dm.getCancelCriterion().checkCancelInProgress(null);
                boolean interrupted = Thread.interrupted();
                try {
                  dm.getDistribution().waitForMessageState(getSender(), channelState);
                  break;
                } catch (InterruptedException ignore) {
                  interrupted = true;
                } finally {
                  if (interrupted) {
                    Thread.currentThread().interrupt();
                  }
                }
              } // for
            }
          } catch (ThreadDeath td) {
            throw td;
          } catch (VirtualMachineError err) {
            SystemFailure.initiateFailure(err);
            // If this ever returns, rethrow the error. We're poisoned
            // now, so don't let this thread continue.
            throw err;
          } catch (Throwable e) {
            // Whenever you catch Error or Throwable, you must also
            // catch VirtualMachineError (see above). However, there is
            // _still_ a possibility that you are dealing with a cascading
            // error condition, so you also need to check to see if the JVM
            // is still usable:
            SystemFailure.checkFailure();
            logger.fatal("Exception caught while waiting for channel state",
                e);
          } finally {
            StateStabilizedMessage ga = new StateStabilizedMessage();
            ga.setRecipient((InternalDistributedMember) requestingMember);
            if (isSingleFlushTo) {
              // not a proxied message but a simple request-response
              ga.sendingMember = dm.getDistributionManagerId();
            } else {
              ga.sendingMember = getSender();
            }
            ga.setProcessorId(processorId);
            if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
              logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", ga);
            }
            if (requestingMember.equals(dm.getDistributionManagerId())) {
              ga.dmProcess(dm);
            } else {
              dm.putOutgoing(ga);
            }
          }
        }
      });
    }

    @Override
    public void toData(DataOutput dout,
        SerializationContext context) throws IOException {
      super.toData(dout, context);
      dout.writeInt(processorId);
      DataSerializer.writeHashMap(channelState, dout);
      DataSerializer.writeObject(requestingMember, dout);
      dout.writeBoolean(this.isSingleFlushTo);
    }

    @Override
    public int getDSFID() {
      return STATE_STABILIZATION_MESSAGE;
    }

    @Override
    public void fromData(DataInput din,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      super.fromData(din, context);
      processorId = din.readInt();
      channelState = DataSerializer.readHashMap(din);
      requestingMember = (DistributedMember) DataSerializer.readObject(din);
      this.isSingleFlushTo = din.readBoolean();
    }

    @Override
    public String toString() {
      return "StateStabilizationMessage(recipients=" + getRecipientsDescription()
          + ",requestingMember=" + requestingMember + ",processorId=" + processorId + ")";
    }
  }

  /**
   * StateStabilizedMessage is sent from a VM that will provide an initial image and is part of a
   * higher-order protocol that is intended to force data in serial execution queues to be processed
   * before the initial image is requested.
   * <p>
   * author bruce
   *
   * @see StateFlushOperation.StateMarkerMessage
   * @see StateFlushOperation.StateStabilizationMessage
   * @since GemFire 5.0.1
   *
   */
  public static class StateStabilizedMessage extends ReplyMessage {
    /** the member for whom this ack is being sent */
    protected DistributedMember sendingMember;

    public StateStabilizedMessage() {
      super();
    }

    // overridden to spoof the source of the message
    @Override
    public InternalDistributedMember getSender() {
      return (InternalDistributedMember) this.sendingMember;
    }

    @Override
    public void process(final DistributionManager dm, final ReplyProcessor21 processor) {
      if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Processing {}", this);
      }
      super.process(dm, processor);
    }

    @Override
    public void toData(DataOutput dout,
        SerializationContext context) throws IOException {
      super.toData(dout, context);
      DataSerializer.writeObject(sendingMember, dout);
    }

    @Override
    public int getDSFID() {
      return STATE_STABILIZED_MESSAGE;
    }

    @Override
    public void fromData(DataInput din,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      super.fromData(din, context);
      sendingMember = (DistributedMember) DataSerializer.readObject(din);
    }

    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("StateStabilizedMessage ");
      sb.append(this.processorId);
      if (super.getSender() != null) {
        sb.append(" from ");
        sb.append(super.getSender());
      }
      if (!getRecipients().isEmpty()) {
        String recip = getRecipientsDescription();
        sb.append(" to ");
        sb.append(recip);
      }
      sb.append(" on behalf of ");
      sb.append(sendingMember);
      ReplyException ex = this.getException();
      if (ex != null) {
        sb.append(" with exception ");
        sb.append(ex);
      }

      return sb.toString();
    }
  }

  /**
   * StateFlushReplyProcessor waits for proxy acks (StateStabilizedMessages) from the target vm. If
   * the target vm goes away, this processor wakes up immediately
   */
  public static class StateFlushReplyProcessor extends ReplyProcessor21 {

    /** the target of the StateFlushOperation */
    InternalDistributedMember targetMember;

    int originalCount;

    /** whether the target member has left the distributed system */
    volatile boolean targetMemberHasLeft;

    public StateFlushReplyProcessor(DistributionManager manager, Set initMembers,
        DistributedMember target) {
      super(manager, initMembers);
      this.targetMember = (InternalDistributedMember) target;
      this.originalCount = initMembers.size();
      this.targetMemberHasLeft = targetMemberHasLeft // bug #43583 - perform an initial membership
          // check
          || !manager.isCurrentMember((InternalDistributedMember) target);
    }

    /** process the failure set from sending the message */
    public void messageNotSentTo(Set failures) {
      for (Iterator it = failures.iterator(); it.hasNext();) {
        this.memberDeparted(null, (InternalDistributedMember) it.next(), true);
      }
    }

    @Override
    public void memberDeparted(DistributionManager distributionManager,
        final InternalDistributedMember id, final boolean crashed) {
      if (id.equals(targetMember)) {
        targetMemberHasLeft = true;
      }
      super.memberDeparted(distributionManager, id, crashed);
    }

    @Override
    protected void processActiveMembers(Set activeMembers) {
      super.processActiveMembers(activeMembers);
      if (!activeMembers.contains(this.targetMember)) {
        targetMemberHasLeft = true;
      }
    }

    @VisibleForTesting
    public boolean getTargetMemberHasLeft() {
      return targetMemberHasLeft;
    }

    @Override
    protected boolean stillWaiting() {
      targetMemberHasLeft =
          targetMemberHasLeft || !getDistributionManager().isCurrentMember(targetMember);
      return super.stillWaiting() && !targetMemberHasLeft;
    }

    @Override
    public String toString() {
      return "<" + shortName() + " " + this.getProcessorId() + " targeting " + targetMember
          + " waiting for " + numMembers() + " replies out of " + this.originalCount + " "
          + (exception == null ? "" : (" exception: " + exception)) + " from " + membersToString()
          + ">";
    }
  }
}
