/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.util.ObjectSizer;
import org.apache.geode.distributed.internal.CacheTime;
import org.apache.geode.internal.cache.versions.CompactVersionHolder;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.size.ReflectionSingleObjectSizer;
import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * Tombstones are region entries that have been destroyed but are held for future concurrency
 * checks. They are timed out after a reasonable period of time when there is no longer the
 * possibility of concurrent modification conflicts.
 * <p>
 * The cache holds a tombstone service that is responsible for tracking and timing out tombstones.
 */
public class TombstoneService {
  private static final Logger logger = LogService.getLogger();

  @VisibleForTesting
  public static final long REPLICATE_TOMBSTONE_TIMEOUT_DEFAULT =
      Long.getLong(GEMFIRE_PREFIX + "tombstone-timeout", 600000L);

  /**
   * The default tombstone expiration period, in milliseconds for replicates and partitions.
   * <p>
   * This is the period over which the destroy operation may conflict with another operation. After
   * this timeout elapses the tombstone is put into a GC set for removal. Removal is typically
   * triggered by the size of the GC set, but could be influenced by resource managers.
   *
   * The default is 600,000 milliseconds (10 minutes).
   */
  @MutableForTesting
  public static long REPLICATE_TOMBSTONE_TIMEOUT = REPLICATE_TOMBSTONE_TIMEOUT_DEFAULT;

  @VisibleForTesting
  public static final long NON_REPLICATE_TOMBSTONE_TIMEOUT_DEFAULT =
      Long.getLong(GEMFIRE_PREFIX + "non-replicated-tombstone-timeout", 480000);

  /**
   * The default tombstone expiration period in millis for non-replicate/partition regions. This
   * tombstone timeout should be shorter than the one for replicated regions and need not be
   * excessively long. Making it longer than the replicated timeout can cause non-replicated regions
   * to issue revisions based on the tombstone that could overwrite modifications made by others
   * that no longer have the tombstone.
   * <p>
   * The default is 480,000 milliseconds (8 minutes)
   */
  @MutableForTesting
  public static long NON_REPLICATE_TOMBSTONE_TIMEOUT = NON_REPLICATE_TOMBSTONE_TIMEOUT_DEFAULT;

  @VisibleForTesting
  public static final int EXPIRED_TOMBSTONE_LIMIT_DEFAULT =
      Integer.getInteger(GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);

  /**
   * The max number of tombstones in an expired batch. This covers all replicated regions, including
   * PR buckets. The default is 100,000 expired tombstones.
   */
  @MutableForTesting
  public static int EXPIRED_TOMBSTONE_LIMIT = EXPIRED_TOMBSTONE_LIMIT_DEFAULT;

  @VisibleForTesting
  public static final long DEFUNCT_TOMBSTONE_SCAN_INTERVAL_DEFAULT =
      Long.getLong(GEMFIRE_PREFIX + "tombstone-scan-interval", 60000);

  /**
   * The interval to scan for expired tombstones in the queues
   */
  public static final long DEFUNCT_TOMBSTONE_SCAN_INTERVAL =
      DEFUNCT_TOMBSTONE_SCAN_INTERVAL_DEFAULT;

  @VisibleForTesting
  public static final double GC_MEMORY_THRESHOLD_DEFAULT =
      Integer.getInteger(GEMFIRE_PREFIX + "tombstone-gc-memory-threshold", 30) * 0.01;

  /**
   * The threshold percentage of free max memory that will trigger tombstone GCs. The default
   * percentage is somewhat less than the LRU Heap evictor so that we evict tombstones before we
   * start evicting cache data.
   */
  @MutableForTesting
  public static double GC_MEMORY_THRESHOLD = GC_MEMORY_THRESHOLD_DEFAULT;

  @VisibleForTesting
  public static final boolean FORCE_GC_MEMORY_EVENTS_DEFAULT = false;

  /** this is a test hook for causing the tombstone service to act as though free memory is low */
  @MutableForTesting
  public static boolean FORCE_GC_MEMORY_EVENTS = FORCE_GC_MEMORY_EVENTS_DEFAULT;

  @VisibleForTesting
  public static final long MAX_SLEEP_TIME_DEFAULT = 10000;

  /** maximum time a sweeper will sleep, in milliseconds. */
  @MutableForTesting
  public static long MAX_SLEEP_TIME = MAX_SLEEP_TIME_DEFAULT;

  @VisibleForTesting
  public static final boolean IDLE_EXPIRATION_DEFAULT = false;

  /** dunit test hook for forced batch expiration */
  @MutableForTesting
  public static boolean IDLE_EXPIRATION = IDLE_EXPIRATION_DEFAULT;

  /**
   * two sweepers, one for replicated regions (including PR buckets) and one for other regions. They
   * have different timeout intervals.
   */
  private final ReplicateTombstoneSweeper replicatedTombstoneSweeper;
  private final NonReplicateTombstoneSweeper nonReplicatedTombstoneSweeper;

  public static TombstoneService initialize(InternalCache cache) {
    return new TombstoneService(cache);
  }

  private TombstoneService(InternalCache cache) {
    replicatedTombstoneSweeper =
        new ReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion(),
            cache.getDistributionManager().getExecutors().getWaitingThreadPool());
    nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache,
        cache.getCachePerfStats(), cache.getCancelCriterion());
    replicatedTombstoneSweeper.start();
    nonReplicatedTombstoneSweeper.start();
  }

  /**
   * this ensures that the background sweeper thread is stopped
   */
  public void stop() {
    replicatedTombstoneSweeper.stop();
    nonReplicatedTombstoneSweeper.stop();
  }

  /**
   * Tombstones are markers placed in destroyed entries in order to keep the entry around for a
   * while so that it's available for concurrent modification detection.
   *
   * @param r the region holding the entry
   * @param entry the region entry that holds the tombstone
   * @param destroyedVersion the version that was destroyed
   */
  public void scheduleTombstone(LocalRegion r, RegionEntry entry, VersionTag destroyedVersion) {
    if (entry.getVersionStamp() == null) {
      logger.warn(
          "Detected an attempt to schedule a tombstone for an entry that is not versioned in region "
              + r.getFullPath(),
          new Exception("stack trace"));
      return;
    }
    Tombstone ts = new Tombstone(entry, r, destroyedVersion);
    getSweeper(r).scheduleTombstone(ts);
  }


  public TombstoneSweeper getSweeper(LocalRegion r) {
    if (r.getScope().isDistributed() && r.getServerProxy() == null
        && r.getDataPolicy().withReplication()) {
      return replicatedTombstoneSweeper;
    } else {
      return nonReplicatedTombstoneSweeper;
    }
  }

  /**
   * remove all tombstones for the given region. Do this when the region is cleared or destroyed.
   */
  public void unscheduleTombstones(LocalRegion r) {
    getSweeper(r).unscheduleTombstones(r);
  }

  public int getGCBlockCount() {
    return replicatedTombstoneSweeper.getGCBlockCount();
  }

  public int incrementGCBlockCount() {
    return replicatedTombstoneSweeper.incrementGCBlockCount();
  }

  public int decrementGCBlockCount() {
    return replicatedTombstoneSweeper.decrementGCBlockCount();
  }

  public long getScheduledTombstoneCount() {
    long result = 0;
    result += replicatedTombstoneSweeper.getScheduledTombstoneCount();
    result += nonReplicatedTombstoneSweeper.getScheduledTombstoneCount();
    return result;
  }

  /**
   * remove tombstones from the given region that have region-versions <= those in the given removal
   * map
   *
   * @return a collection of keys removed (only if the region is a bucket - empty otherwise)
   */
  @SuppressWarnings("rawtypes")
  public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource<?>, Long> regionGCVersions,
      boolean needsKeys) {
    synchronized (getBlockGCLock()) {
      int count = getGCBlockCount();
      if (count > 0) {
        // if any delta GII is on going as provider at this member, not to do tombstone GC
        if (logger.isDebugEnabled()) {
          logger.debug("gcTombstones skipped due to {} Delta GII on going", count);
        }
        return null;
      }
      if (logger.isDebugEnabled()) {
        logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
      }
      final VersionSource myId = r.getVersionMember();
      final TombstoneSweeper sweeper = getSweeper(r);
      final List<Tombstone> removals = new ArrayList<>();
      sweeper.removeUnexpiredIf(t -> {
        if (t.region == r) {
          VersionSource destroyingMember = t.getMemberID();
          if (destroyingMember == null) {
            destroyingMember = myId;
          }
          Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
          if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV) {
            removals.add(t);
            return true;
          }
        }
        return false;
      });

      // Record the GC versions now, so that we can persist them
      for (Map.Entry<VersionSource<?>, Long> entry : regionGCVersions.entrySet()) {
        r.getVersionVector().recordGCVersion(entry.getKey(), entry.getValue());
      }

      // Remove any exceptions from the RVV that are older than the GC version
      r.getVersionVector().pruneOldExceptions();

      // Persist the GC RVV to disk. This needs to happen BEFORE we remove
      // the entries from map, to prevent us from removing a tombstone
      // from disk that has a version greater than the persisted
      // GV RVV.
      if (r.getDataPolicy().withPersistence()) {
        // Update the version vector which reflects what has been persisted on disk.
        r.getDiskRegion().writeRVVGC(r);
      }

      Set<Object> removedKeys = needsKeys ? new HashSet<>() : Collections.emptySet();
      for (Tombstone t : removals) {
        boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t);
        if (needsKeys && tombstoneWasStillInRegionMap) {
          removedKeys.add(t.entry.getKey());
        }
      }
      return removedKeys;
    } // sync on deltaGIILock
  }

  /**
   * client tombstone removal is key-based if the server is a PR. This is due to the server having
   * separate version vectors for each bucket. In the client this causes the version vector to make
   * no sense, so we have to send it a collection of the keys removed on the server and then we
   * brute-force remove any of them that are tombstones on the client
   *
   * @param r the region affected
   * @param tombstoneKeys the keys removed on the server
   */
  public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
    if (r.getServerProxy() == null) {
      // if the region does not have a server proxy
      // then it will not have any tombstones to gc for the server.
      return;
    }
    if (logger.isDebugEnabled()) {
      logger.debug("gcTombstoneKeys invoked for region {} and keys {}", r, tombstoneKeys);
    }
    final TombstoneSweeper sweeper = getSweeper(r);
    final List<Tombstone> removals = new ArrayList<>(tombstoneKeys.size());
    sweeper.removeUnexpiredIf(t -> {
      if (t.region == r) {
        if (tombstoneKeys.contains(t.entry.getKey())) {
          removals.add(t);
          return true;
        }
      }
      return false;
    });

    for (Tombstone t : removals) {
      // TODO - RVV - to support persistent client regions
      // we need to actually record this as a destroy on disk, because
      // the GCC RVV doesn't make sense on the client.
      t.region.getRegionMap().removeTombstone(t.entry, t);
    }
  }

  /**
   * For test purposes only, force the expiration of a number of tombstones for replicated regions.
   *
   * @param count Number of tombstones to expire
   *
   * @return true if the expiration occurred
   */
  public boolean forceBatchExpirationForTests(int count) throws InterruptedException {
    return replicatedTombstoneSweeper.testHook_forceExpiredTombstoneGC(count, 30, SECONDS);
  }

  /**
   * For test purposes only, force the expiration of a number of tombstones for replicated regions.
   *
   * @param count Number of tombstones to expire
   * @param timeout the maximum time to wait
   * @param unit the time unit of the {@code timeout} argument
   *
   * @return true if the expiration occurred
   */
  public boolean forceBatchExpirationForTests(int count, long timeout, TimeUnit unit)
      throws InterruptedException {
    return replicatedTombstoneSweeper.testHook_forceExpiredTombstoneGC(count, timeout, unit);
  }

  @Override
  public String toString() {
    return "Destroyed entries GC service.  Replicate Queue=" + replicatedTombstoneSweeper
        + " Non-replicate Queue=" + nonReplicatedTombstoneSweeper;
  }

  public Object getBlockGCLock() {
    return replicatedTombstoneSweeper.getBlockGCLock();
  }

  @VisibleForTesting
  public static class Tombstone extends CompactVersionHolder {
    // tombstone overhead size
    public static final int PER_TOMBSTONE_OVERHEAD =
        ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
            + ReflectionSingleObjectSizer.REFERENCE_SIZE * 3 // entry, region, member ID
            + ReflectionSingleObjectSizer.REFERENCE_SIZE // region entry value (Token.TOMBSTONE)
            + 18; // version numbers and timestamp


    RegionEntry entry;
    LocalRegion region;

    @VisibleForTesting
    public Tombstone(RegionEntry entry, LocalRegion region, VersionTag destroyedVersion) {
      super(destroyedVersion);
      this.entry = entry;
      this.region = region;
    }

    public int getSize() {
      return Tombstone.PER_TOMBSTONE_OVERHEAD // includes per-entry overhead
          + ObjectSizer.DEFAULT.sizeof(entry.getKey());
    }

    @Override
    public String toString() {
      String v = super.toString();
      return "(" + entry.getKey() + "; " + region.getName() + "; "
          + v + ")";
    }
  }
  private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
    NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats,
        CancelCriterion cancelCriterion) {
      super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT,
          "Non-replicate Region Garbage Collector");
    }

    @Override
    protected boolean removeExpiredIf(Predicate<Tombstone> predicate) {
      return false;
    }

    @Override
    protected void updateStatistics() {
      stats.setNonReplicatedTombstonesSize(getMemoryEstimate());
    }

    @Override
    protected boolean hasExpired(long msUntilTombstoneExpires) {
      /*
       * In case the tombstone expiration time would be too far out lets cap it. This is just
       * making the system fault tolerant in the case that there are large clock jumps or
       * unrealistically large timestamps.
       */
      return msUntilTombstoneExpires <= 0 || msUntilTombstoneExpires > EXPIRY_TIME;
    }

    @Override
    protected void expireTombstone(Tombstone tombstone) {
      if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
        logger.trace(LogMarker.TOMBSTONE_VERBOSE, "removing expired tombstone {}", tombstone);
      }
      updateMemoryEstimate(-tombstone.getSize());
      tombstone.region.getRegionMap().removeTombstone(tombstone.entry, tombstone);
    }

    @Override
    protected void checkExpiredTombstoneGC() {}

    @Override
    protected void handleNoUnexpiredTombstones() {}

    @Override
    boolean testHook_forceExpiredTombstoneGC(int count, long timeout, TimeUnit unit)
        throws InterruptedException {
      return true;
    }

    @Override
    protected void beforeSleepChecks() {}

    @Override
    public long getOldestTombstoneTime() {
      long result = 0;
      if (tombstones.peek() != null) {
        result = tombstones.peek().getVersionTimeStamp();
      }
      return result;
    }

    @Override
    public String getOldestTombstone() {
      String result = null;
      if (tombstones.peek() != null) {
        result = tombstones.peek().toString();
      }
      return result;
    }
  }

  protected static class ReplicateTombstoneSweeper extends TombstoneSweeper {
    /**
     * Used to execute batch gc message execution in the background.
     */
    private final ExecutorService executor;
    /**
     * tombstones that have expired and are awaiting batch removal.
     */
    private final List<Tombstone> expiredTombstones;
    private final Object expiredTombstonesLock = new Object();

    /**
     * Force batch expiration
     */
    private boolean forceBatchExpiration = false;

    /**
     * Is a batch expiration in progress? Part of expireBatch is done in a background thread and
     * until that completes batch expiration is in progress.
     */
    private volatile boolean batchExpirationInProgress;

    private final Object blockGCLock = new Object();
    private int progressingDeltaGIICount;

    /**
     * A test hook to force a call to expireBatch. The call will only happen after
     * testHook_forceExpirationCount goes to zero. This latch is counted down at the end of
     * expireBatch. See @{link {@link TombstoneService#forceBatchExpirationForTests(int)}
     */
    private CountDownLatch testHook_forceBatchExpireCall;
    /**
     * count of tombstones to forcibly expire
     */
    private int testHook_forceExpirationCount = 0;

    ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats,
        CancelCriterion cancelCriterion, ExecutorService executor) {
      super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT,
          "Replicate/Partition Region Garbage Collector");
      expiredTombstones = new ArrayList<>();
      this.executor = executor;
    }

    public int decrementGCBlockCount() {
      synchronized (getBlockGCLock()) {
        return --progressingDeltaGIICount;
      }
    }

    public int incrementGCBlockCount() {
      synchronized (getBlockGCLock()) {
        return ++progressingDeltaGIICount;
      }
    }

    public int getGCBlockCount() {
      synchronized (getBlockGCLock()) {
        return progressingDeltaGIICount;
      }
    }

    public Object getBlockGCLock() {
      return blockGCLock;
    }

    @Override
    protected boolean removeExpiredIf(Predicate<Tombstone> predicate) {
      boolean result = false;
      long removalSize = 0;
      synchronized (expiredTombstonesLock) {
        // Iterate in reverse order to optimize lots of removes.
        // Since expiredTombstones is an ArrayList removing from
        // low indexes requires moving everything at a higher index down.
        for (int idx = expiredTombstones.size() - 1; idx >= 0; idx--) {
          Tombstone t = expiredTombstones.get(idx);
          if (predicate.test(t)) {
            removalSize += t.getSize();
            expiredTombstones.remove(idx);
            result = true;
          }
        }
      }
      updateMemoryEstimate(-removalSize);
      return result;
    }

    /** expire a batch of tombstones */

    protected void expireBatch() {
      // fix for bug #46087 - OOME due to too many GC threads
      if (batchExpirationInProgress) {
        // incorrect return due to race between this and waiting-pool GC thread is okay
        // because the sweeper thread will just try again after its next sleep (max sleep is 10
        // seconds)
        return;
      }
      synchronized (getBlockGCLock()) {
        int count = getGCBlockCount();
        if (count > 0) {
          // if any delta GII is on going as provider at this member, not to do tombstone GC
          if (logger.isDebugEnabled()) {
            logger.debug("expireBatch skipped due to {} Delta GII on going", count);
          }
          return;
        }

        batchExpirationInProgress = true;
        boolean batchScheduled = false;
        try {

          // TODO seems like no need for the value of this map to be a Set.
          // It could instead be a List, which would be nice because the per entry
          // memory overhead for a set is much higher than an ArrayList
          // BUT we send it to clients and the old
          // version of them expects it to be a Set.
          final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();

          // Update the GC RVV for all of the affected regions.
          // We need to do this so that we can persist the GC RVV before
          // we start removing entries from the map.
          synchronized (expiredTombstonesLock) {
            for (Tombstone t : expiredTombstones) {
              DistributedRegion tr = (DistributedRegion) t.region;
              if (!tr.isInitialized()) {
                continue;
              }
              tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
              if (!reapedKeys.containsKey(tr)) {
                reapedKeys.put(tr, Collections.emptySet());
              }
            }
          }

          for (DistributedRegion r : reapedKeys.keySet()) {
            // Remove any exceptions from the RVV that are older than the GC version
            r.getVersionVector().pruneOldExceptions();

            // Persist the GC RVV to disk. This needs to happen BEFORE we remove
            // the entries from map, to prevent us from removing a tombstone
            // from disk that has a version greater than the persisted
            // GV RVV.
            if (r.getDataPolicy().withPersistence()) {
              r.getDiskRegion().writeRVVGC(r);
            }
          }

          // Remove the tombstones from the in memory region map.
          removeExpiredIf(t -> {
            // for PR buckets we have to keep track of the keys removed because clients have
            // them all lumped in a single non-PR region
            DistributedRegion tr = (DistributedRegion) t.region;
            if (reapedKeys.containsKey(tr)) {
              boolean tombstoneWasStillInRegionMap =
                  tr.getRegionMap().removeTombstone(t.entry, t);
              if (tombstoneWasStillInRegionMap && hasToTrackKeysForClients(tr)) {
                Set<Object> keys = reapedKeys.get(tr);
                if (keys.isEmpty()) {
                  keys = new HashSet<>();
                  reapedKeys.put(tr, keys);
                }
                keys.add(t.entry.getKey());
              }
            }
            return true;
          });

          // do messaging in a pool so this thread is not stuck trying to
          // communicate with other members
          executor.execute(() -> {
            try {
              // this thread should not reference other sweeper state, which is not synchronized
              for (Map.Entry<DistributedRegion, Set<Object>> mapEntry : reapedKeys.entrySet()) {
                DistributedRegion r = mapEntry.getKey();
                Set<Object> rKeysReaped = mapEntry.getValue();
                r.distributeTombstoneGC(rKeysReaped);
              }
            } finally {
              batchExpirationInProgress = false;
            }
          });
          batchScheduled = true;
        } finally {
          if (testHook_forceBatchExpireCall != null) {
            testHook_forceBatchExpireCall.countDown();
          }
          if (!batchScheduled) {
            batchExpirationInProgress = false;
          }
        }
      } // sync on deltaGIILock
    }

    /**
     * Returns true if keys needs to be tracked for clients registering interests on PR.
     */
    private boolean hasToTrackKeysForClients(DistributedRegion r) {
      return r.isUsedForPartitionedRegionBucket()
          && ((r.getFilterProfile() != null && r.getFilterProfile().hasInterest())
              || r.getPartitionedRegion().getRegionAdvisor().hasPRServerWithInterest());
    }

    @Override
    protected void checkExpiredTombstoneGC() {
      if (shouldCallExpireBatch()) {
        forceBatchExpiration = false;
        expireBatch();
      }
      checkIfBatchExpirationShouldBeForced();
    }

    private boolean shouldCallExpireBatch() {
      if (testHook_forceExpirationCount > 0) {
        return false;
      }
      if (forceBatchExpiration) {
        return true;
      }
      if (testHook_forceBatchExpireCall != null) {
        return true;
      }
      return expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT;
    }

    private void testHookIfIdleExpireBatch() {
      if (IDLE_EXPIRATION && sleepTime >= EXPIRY_TIME && !expiredTombstones.isEmpty()) {
        expireBatch();
      }
    }

    @Override
    protected void updateStatistics() {
      stats.setReplicatedTombstonesSize(getMemoryEstimate());
    }

    private void checkIfBatchExpirationShouldBeForced() {
      if (testHook_forceExpirationCount > 0) {
        return;
      }
      if (GC_MEMORY_THRESHOLD <= 0.0) {
        return;
      }
      if (batchExpirationInProgress) {
        return;
      }
      if (expiredTombstones.size() <= (EXPIRED_TOMBSTONE_LIMIT / 4)) {
        return;
      }
      if (FORCE_GC_MEMORY_EVENTS || isFreeMemoryLow()) {
        forceBatchExpiration = true;
        if (logger.isDebugEnabled()) {
          logger.debug("forcing batch expiration due to low memory conditions");
        }
      }
    }

    private boolean isFreeMemoryLow() {
      Runtime rt = Runtime.getRuntime();
      long unusedMemory = rt.freeMemory(); // "free" is how much space we have allocated that is
                                           // currently not used
      long totalMemory = rt.totalMemory(); // "total" is how much space we have allocated
      long maxMemory = rt.maxMemory(); // "max" is how much space we can allocate
      unusedMemory += (maxMemory - totalMemory); // "max-total" is how much space we have that has
                                                 // not yet been allocated
      return unusedMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD;
    }

    @Override
    protected boolean hasExpired(long msUntilTombstoneExpires) {
      if (testHook_forceExpirationCount > 0) {
        testHook_forceExpirationCount--;
        return true;
      }
      /*
       * In case the tombstone expiration time would be too far out lets cap it. This is just
       * making the system fault tolerant in the case that there are large clock jumps or
       * unrealistically large timestamps.
       */
      return msUntilTombstoneExpires <= 0 || msUntilTombstoneExpires > EXPIRY_TIME;
    }

    @Override
    protected void expireTombstone(Tombstone tombstone) {
      if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
        logger.trace(LogMarker.TOMBSTONE_VERBOSE, "adding expired tombstone {} to batch",
            tombstone);
      }
      synchronized (expiredTombstonesLock) {
        expiredTombstones.add(tombstone);
      }
    }

    @Override
    public long getOldestTombstoneTime() {
      long result = 0;
      if (tombstones.peek() != null) {
        result = tombstones.peek().getVersionTimeStamp();
      }
      return result;
    }

    @Override
    public String getOldestTombstone() {
      String result = null;
      if (tombstones.peek() != null) {
        result = tombstones.peek().toString();
      }
      return result;
    }

    @Override
    protected void handleNoUnexpiredTombstones() {
      testHook_forceExpirationCount = 0;
    }

    @Override
    public String toString() {
      return super.toString() + " batchedExpiredTombstones[" + expiredTombstones.size() + "] = "
          + expiredTombstones;
    }

    @Override
    boolean testHook_forceExpiredTombstoneGC(int count, long timeout, TimeUnit unit)
        throws InterruptedException {
      // sync on blockGCLock since expireBatch syncs on it
      synchronized (getBlockGCLock()) {
        testHook_forceBatchExpireCall = new CountDownLatch(1);
      }
      try {
        synchronized (this) {
          testHook_forceExpirationCount += count;
          notifyAll();
        }
        return testHook_forceBatchExpireCall.await(timeout, unit);
      } finally {
        testHook_forceBatchExpireCall = null;
      }
    }

    @Override
    protected void beforeSleepChecks() {
      testHookIfIdleExpireBatch();
    }

    @Override
    public long getScheduledTombstoneCount() {
      return super.getScheduledTombstoneCount() + expiredTombstones.size();
    }
  }

  public abstract static class TombstoneSweeper implements Runnable {
    /**
     * the expiration time for tombstones in this sweeper
     */
    protected final long EXPIRY_TIME;
    /**
     * The minimum amount of elapsed time, in millis, between purges.
     */
    private final long PURGE_INTERVAL;
    /**
     * How long the sweeper should sleep.
     */
    protected long sleepTime;
    /**
     * Estimate of how long, in millis, it will take to do a purge of obsolete tombstones.
     */
    private long minimumPurgeTime = 1;
    /**
     * Timestamp of when the last purge was done.
     */
    private long lastPurgeTimestamp;
    /**
     * the current tombstones. These are queued for expiration. When tombstones are resurrected they
     * are left in this queue and the sweeper thread figures out that they are no longer valid
     * tombstones.
     */
    @VisibleForTesting
    public final Queue<Tombstone> tombstones;
    /**
     * Estimate of the amount of memory used by this sweeper
     */
    private final AtomicLong memoryUsedEstimate;
    /**
     * the thread that handles tombstone expiration.
     */
    private final Thread sweeperThread;
    /**
     * A lock protecting the head of the tombstones queue. Operations that may remove the head need
     * to hold this lock.
     */
    private final StoppableReentrantLock queueHeadLock;


    protected final CacheTime cacheTime;
    protected final CachePerfStats stats;
    private final CancelCriterion cancelCriterion;

    private volatile boolean isStopped;

    TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion,
        long expiryTime, String threadName) {
      this.cacheTime = cacheTime;
      this.stats = stats;
      this.cancelCriterion = cancelCriterion;
      EXPIRY_TIME = expiryTime;
      PURGE_INTERVAL = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
      tombstones = new ConcurrentLinkedQueue<>();
      memoryUsedEstimate = new AtomicLong();
      queueHeadLock = new StoppableReentrantLock(cancelCriterion);
      sweeperThread = new LoggingThread(threadName, this);
      lastPurgeTimestamp = getNow();
    }

    public void unscheduleTombstones(final LocalRegion r) {
      removeIf(t -> {
        return t.region == r;
      });
    }

    /**
     * For each unexpired tombstone this sweeper knows about call the predicate. If the predicate
     * returns true then remove the tombstone from any storage and update the memory estimate.
     *
     * @return true if predicate ever returned true
     */
    private boolean removeUnexpiredIf(Predicate<Tombstone> predicate) {
      boolean result = false;
      long removalSize = 0;
      lockQueueHead();
      try {
        for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext();) {
          Tombstone t = it.next();
          if (predicate.test(t)) {
            removalSize += t.getSize();
            it.remove();
            result = true;
          }
        }
      } finally {
        unlockQueueHead();
      }
      updateMemoryEstimate(-removalSize);
      return result;
    }

    /**
     * For all tombstone this sweeper knows about call the predicate. If the predicate returns true
     * then remove the tombstone from any storage and update the memory estimate.
     *
     * @return true if predicate ever returned true
     */
    private boolean removeIf(Predicate<Tombstone> predicate) {
      return removeUnexpiredIf(predicate) || removeExpiredIf(predicate);
    }

    synchronized void start() {
      sweeperThread.start();
    }

    void stop() {
      synchronized (this) {
        isStopped = true;
        notifyAll();
      }
      try {
        sweeperThread.join(100);
      } catch (InterruptedException ignore) {
        Thread.currentThread().interrupt();
      }
    }

    private void lockQueueHead() {
      queueHeadLock.lock();
    }

    private void unlockQueueHead() {
      queueHeadLock.unlock();
    }

    public long getMemoryEstimate() {
      return memoryUsedEstimate.get();
    }

    public void updateMemoryEstimate(long delta) {
      memoryUsedEstimate.addAndGet(delta);
    }

    protected Queue<Tombstone> getQueue() {
      return tombstones;
    }

    void scheduleTombstone(Tombstone ts) {
      tombstones.add(ts);
      updateMemoryEstimate(ts.getSize());
    }

    @Override
    public void run() {
      if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
        logger.trace(LogMarker.TOMBSTONE_VERBOSE,
            "Destroyed entries sweeper starting with sleep interval of {} milliseconds",
            EXPIRY_TIME);
      }
      while (!isStopped && !cancelCriterion.isCancelInProgress()) {
        try {
          updateStatistics();
          SystemFailure.checkFailure();
          final long now = getNow();
          checkExpiredTombstoneGC();
          checkOldestUnexpired(now);
          purgeObsoleteTombstones(now);
          doSleep();
        } catch (CancelException ignore) {
          break;
        } catch (VirtualMachineError err) { // GemStoneAddition
          SystemFailure.initiateFailure(err);
          // If this ever returns, rethrow the error. We're poisoned
          // now, so don't let this thread continue.
          throw err;
        } catch (Throwable e) {
          SystemFailure.checkFailure();
          logger.fatal("GemFire garbage collection service encountered an unexpected exception",
              e);
        }
      } // while()
    } // run()

    private long getNow() {
      return cacheTime.cacheTimeMillis();
    }

    private void doSleep() {
      if (sleepTime <= 0) {
        return;
      }
      beforeSleepChecks();
      sleepTime = Math.min(sleepTime, MAX_SLEEP_TIME);
      if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
        logger.trace(LogMarker.TOMBSTONE_VERBOSE, "sleeping for {}", sleepTime);
      }
      synchronized (this) {
        if (isStopped) {
          return;
        }
        try {
          wait(sleepTime);
        } catch (InterruptedException ignore) {
        }
      }
    }

    private void purgeObsoleteTombstones(final long now) {
      if (minimumPurgeTime > sleepTime) {
        // the purge might take minimumScanTime
        // and we have something to do sooner
        // than that so return
        return;
      }
      if ((now - lastPurgeTimestamp) < PURGE_INTERVAL) {
        // the time since the last purge
        // is less than the configured interval
        // so return
        return;
      }
      lastPurgeTimestamp = now;
      // see if any have been superseded
      boolean removedObsoleteTombstone = removeIf(tombstone -> {
        if (tombstone.region.getRegionMap().isTombstoneNotNeeded(tombstone.entry,
            tombstone.getEntryVersion())) {
          if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
            logger.trace(LogMarker.TOMBSTONE_VERBOSE, "removing obsolete tombstone: {}", tombstone);
          }
          return true;
        }
        return false;
      });
      if (removedObsoleteTombstone) {
        sleepTime = 0;
      } else {
        long elapsed = getNow() - now;
        sleepTime -= elapsed;
        if (sleepTime <= 0) {
          minimumPurgeTime = elapsed;
        }
      }
    }

    /**
     * See if the oldest unexpired tombstone should be expired.
     */
    @VisibleForTesting
    public void checkOldestUnexpired(long now) {
      sleepTime = 0;
      lockQueueHead();
      Tombstone oldest = tombstones.peek();
      try {
        if (oldest == null) {
          if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
            logger.trace(LogMarker.TOMBSTONE_VERBOSE, "queue is empty - will sleep");
          }
          handleNoUnexpiredTombstones();
          sleepTime = EXPIRY_TIME;
        } else {
          if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
            logger.trace(LogMarker.TOMBSTONE_VERBOSE, "oldest unexpired tombstone is {}", oldest);
          }
          long msUntilHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
          if (hasExpired(msUntilHeadTombstoneExpires)) {
            try {
              tombstones.remove();
              expireTombstone(oldest);
            } catch (CancelException ignore) {
              // nothing needed
            } catch (Exception e) {
              logger.warn("Unexpected exception while processing tombstones", e);
            }
          } else {
            sleepTime = Math.min(msUntilHeadTombstoneExpires, EXPIRY_TIME);
          }
        }
      } finally {
        unlockQueueHead();
      }
    }

    public long getScheduledTombstoneCount() {
      return getQueue().size();
    }

    @Override
    public String toString() {
      return "[" + getQueue().size() + "] " + getQueue().toString();
    }

    /**
     * For each expired tombstone this sweeper knows about call the predicate. If the predicate
     * returns true then remove the tombstone from any storage and update the memory estimate.
     * <p>
     * Some sweepers batch up the expired tombstones to gc them later.
     *
     * @return true if predicate ever returned true
     */
    protected abstract boolean removeExpiredIf(Predicate<Tombstone> predicate);

    /** see if the already expired tombstones should be processed */
    protected abstract void checkExpiredTombstoneGC();

    protected abstract void handleNoUnexpiredTombstones();

    protected abstract boolean hasExpired(long msUntilTombstoneExpires);

    protected abstract void expireTombstone(Tombstone tombstone);

    protected abstract void updateStatistics();

    /**
     * Do anything needed before the sweeper sleeps.
     */
    protected abstract void beforeSleepChecks();

    abstract boolean testHook_forceExpiredTombstoneGC(int count, long timeout, TimeUnit unit)
        throws InterruptedException;

    public abstract long getOldestTombstoneTime();

    public abstract String getOldestTombstone();

  } // class TombstoneSweeper
}
