TEPHRA-216 Handle empty transactional regions during inactive list pruning

This closes #34

Signed-off-by: poorna <poorna@apache.org>
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
index 8ea5a11..d73c50a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
@@ -93,7 +93,8 @@
         }
       }
       if (toTruncate.isEmpty()) {
-        LOG.info("Not pruning invalid list since no invalid id is less than or equal to the minimum prune upper bound");
+        LOG.info("Not pruning invalid list since the min prune upper bound {} is greater than the min invalid id {}",
+                 minPruneUpperBound, invalids[0]);
         return;
       }
 
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
index 52d7279..d80bbd4 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
@@ -21,6 +21,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
@@ -68,7 +69,11 @@
     }
 
     LOG.info("Starting {}...", this.getClass().getSimpleName());
-    scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    scheduledExecutorService =
+      Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+                                                   .setNameFormat("tephra-pruning-thread")
+                                                   .setDaemon(true)
+                                                   .build());
 
     Map<String, TransactionPruningPlugin> plugins = initializePlugins();
     long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 53a8957..9ff4d3b 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -43,6 +43,7 @@
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -310,6 +311,28 @@
   }
 
   @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+    // Record whether the region is empty after a flush
+    HRegion region = e.getEnvironment().getRegion();
+    // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+    // then the region must be empty
+    long numStoreFiles = numStoreFilesForRegion(e);
+    long memstoreSize = region.getMemstoreSize().get();
+    LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+                            region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+    if (memstoreSize == 0 && numStoreFiles == 0) {
+      if (pruneEnable == null) {
+        initPruneState(e);
+      }
+
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        compactionState.persistRegionEmpty(System.currentTimeMillis());
+      }
+    }
+
+  }
+
+  @Override
   public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
       CompactionRequest request)
@@ -318,25 +341,7 @@
     TransactionVisibilityState snapshot = cache.getLatestState();
 
     if (pruneEnable == null) {
-      Configuration conf = getConfiguration(c.getEnvironment());
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        if (Boolean.TRUE.equals(pruneEnable)) {
-          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                        + pruneTable);
-          }
-        }
-      }
+      initPruneState(c);
     }
 
     if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
+  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+    Configuration conf = getConfiguration(c.getEnvironment());
+    // Configuration won't be null in TransactionProcessor but the derived classes might return
+    // null if it is not available temporarily
+    if (conf != null) {
+      pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                    TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
+      }
+    }
+  }
+
+  private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+    long numStoreFiles = 0;
+    for (Store store : c.getEnvironment().getRegion().getStores().values()) {
+      numStoreFiles += store.getStorefiles().size();
+    }
+    return numStoreFiles;
+  }
+
   /**
    * Filter used to include cells visible to in-progress transactions on flush and commit.
    */
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index c1f1825..7060244 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
   }
 
   /**
+   * Persist that the given region is empty at the given time
+   * @param time time in milliseconds
+   */
+  public void persistRegionEmpty(long time) {
+    pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+    }
+  }
+
+  /**
    * Releases the usage {@link PruneUpperBoundWriter}.
    */
   public void stop() {
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 979eb1a..4345fe6 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,11 +54,14 @@
  */
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
+  private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
   public static final byte[] FAMILY = {'f'};
   public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+  private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
 
   private static final byte[] REGION_KEY_PREFIX = {0x1};
   private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
 
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
   private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+  // This value can be used when we don't care about the value we write in a column
+  private static final byte[] COL_VAL = Bytes.toBytes('1');
 
   private final TableSupplier stateTableSupplier;
 
@@ -148,7 +163,7 @@
     for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
       resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
     }
-    return resultMap;
+    return Collections.unmodifiableMap(resultMap);
   }
 
   /**
@@ -181,7 +196,7 @@
         }
       }
     }
-    return regionPruneInfos;
+    return Collections.unmodifiableList(regionPruneInfos);
   }
 
   /**
@@ -223,7 +238,7 @@
   // ---------------------------------------------------
   // ------- Methods for regions at a given time -------
   // ---------------------------------------------------
-  // Key: 0x2<time><region-id>
+  // Key: 0x2<inverted time><region-id>
   // Col 't': <empty byte array>
   // ---------------------------------------------------
 
@@ -240,12 +255,22 @@
     try (HTableInterface stateTable = stateTableSupplier.get()) {
       for (byte[] region : regions) {
         Put put = new Put(makeTimeRegionKey(timeBytes, region));
-        put.add(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+        put.add(FAMILY, REGION_TIME_COL, COL_VAL);
         stateTable.put(put);
       }
+
+      // Save the count of regions as a checksum
+      saveRegionCountForTime(stateTable, timeBytes, regions.size());
     }
   }
 
+  @VisibleForTesting
+  void saveRegionCountForTime(HTableInterface stateTable, byte[] timeBytes, int count) throws IOException {
+    Put put = new Put(makeTimeRegionCountKey(timeBytes));
+    put.add(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+    stateTable.put(put);
+  }
+
   /**
    * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
    * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
    */
   @Nullable
   public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
-    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (HTableInterface stateTable = stateTableSupplier.get()) {
-      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, REGION_TIME_COL);
-
-      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-      long currentRegionTime = -1;
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
-          // Stop if reached next time value
-          if (currentRegionTime == -1) {
-            currentRegionTime = timeRegion.getKey();
-          } else if (timeRegion.getKey() < currentRegionTime) {
-            break;
-          } else if (timeRegion.getKey() > currentRegionTime) {
-            throw new IllegalStateException(
-              String.format("Got out of order time %d when expecting time less than or equal to %d",
-                            timeRegion.getKey(), currentRegionTime));
-          }
-          regions.add(timeRegion.getValue());
+      TimeRegions timeRegions;
+      while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+        int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+        if (count != -1 && count == timeRegions.getRegions().size()) {
+          return timeRegions;
+        } else {
+          LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+                                 timeRegions.getTime(), count, timeRegions.getRegions().size()));
+          time = time - 1;
         }
       }
-      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+      return null;
     }
   }
 
+  @Nullable
+  private TimeRegions getNextSetOfTimeRegions(HTableInterface stateTable, long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+    scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+    long currentRegionTime = -1;
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    Result next;
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      while ((next = scanner.next()) != null) {
+        Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+        // Stop if reached next time value
+        if (currentRegionTime == -1) {
+          currentRegionTime = timeRegion.getKey();
+        } else if (timeRegion.getKey() < currentRegionTime) {
+          break;
+        } else if (timeRegion.getKey() > currentRegionTime) {
+          throw new IllegalStateException(
+            String.format("Got out of order time %d when expecting time less than or equal to %d",
+                          timeRegion.getKey(), currentRegionTime));
+        }
+        regions.add(timeRegion.getValue());
+      }
+    }
+    return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+  }
+
+  @VisibleForTesting
+  int getRegionCountForTime(HTableInterface stateTable, long time) throws IOException {
+    Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+    get.addColumn(FAMILY, REGION_TIME_COL);
+    Result result = stateTable.get(get);
+    byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+    return value == null ? -1 : Bytes.toInt(value);
+  }
+
   /**
    * Delete all the regions that were recorded for all times equal or less than the given time.
    *
@@ -294,15 +345,15 @@
   public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
     byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (HTableInterface stateTable = stateTableSupplier.get()) {
+      // Delete the regions
       Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
 
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
-        }
-      }
+      // Delete the count
+      scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
     }
   }
 
@@ -356,14 +407,82 @@
       Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
                            INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  // --------------------------------------------------------
+  // ------- Methods for empty regions at a given time -------
+  // --------------------------------------------------------
+  // Key: 0x4<time><region-id>
+  // Col 'e': <empty byte array>
+  // --------------------------------------------------------
+
+  /**
+   * Save the given region as empty as of the given time.
+   *
+   * @param time time in milliseconds
+   * @param regionId region id
+   */
+  public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(time);
+    try (HTableInterface stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+      put.add(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Return regions that were recorded as empty after the given time.
+   *
+   * @param time time in milliseconds
+   * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+   *                       and the empty regions after the given time
+   */
+  public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+    throws IOException {
+    SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (HTableInterface stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+                           EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
 
       try (ResultScanner scanner = stateTable.getScanner(scan)) {
         Result next;
         while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
+          byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+          if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+            emptyRegions.add(emptyRegion);
+          }
         }
       }
     }
+    return Collections.unmodifiableSortedSet(emptyRegions);
+  }
+
+  /**
+   * Delete empty region records saved on or before the given time.
+   *
+   * @param time time in milliseconds
+   */
+  public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+    try (HTableInterface stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan();
+      scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  @VisibleForTesting
+  void deleteFromScan(HTableInterface stateTable, Scan scan) throws IOException {
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      Result next;
+      while ((next = scanner.next()) != null) {
+        stateTable.delete(new Delete(next.getRow()));
+      }
+    }
   }
 
   private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
     return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
   }
 
+  private byte[] makeTimeRegionCountKey(byte[] time) {
+    return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+  }
+
   private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
     return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
   }
@@ -391,6 +514,15 @@
     return Maps.immutableEntry(time, regionName);
   }
 
+  private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+    return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+  }
+
+  private byte[] getEmptyRegionFromKey(byte[] key) {
+    int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+    return Bytes.copy(key, prefixLen, key.length - prefixLen);
+  }
+
   private long getInvertedTime(long time) {
     return Long.MAX_VALUE - time;
   }
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 80da8d8..021f1b2 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 /**
@@ -203,6 +204,8 @@
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
     LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
     dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
   }
 
   @Override
@@ -295,26 +298,40 @@
       SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
       long time = timeRegions.getTime();
 
-      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+      if (inactiveTransactionBound == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+                      "and hence the data must be incomplete", time);
+        }
+        continue;
+      }
+
+      // Get the prune upper bounds for all the transactional regions
+      Map<byte[], Long> pruneUpperBoundRegions =
+        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
       logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+      // for transactions started on or before inactiveTransactionBoundTime
+      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+                                                  pruneUpperBoundRegions);
+
       // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
       // across all regions
-      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
-        long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
-        LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
-        // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
-        if (inactiveTransactionBound != -1) {
-          Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
-          return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
-                        "and hence the data must be incomplete", time);
-          }
-        }
+      if (!transactionalRegions.isEmpty() &&
+        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+        return pruneUpperBound;
       } else {
         if (LOG.isDebugEnabled()) {
-          Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+          Sets.SetView<byte[]> difference =
+            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
           LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
                     time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
         }
@@ -325,6 +342,28 @@
     return -1;
   }
 
+  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+                                               SortedSet<byte[]> transactionalRegions,
+                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+    SortedSet<byte[]> emptyRegions =
+      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+    // for these empty regions as inactiveTransactionBound
+    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+    for (byte[] emptyRegion : emptyRegions) {
+      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+      }
+    }
+    return Collections.unmodifiableMap(pubWithEmptyRegions);
+  }
+
   private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 5a86b4a..beed1ad 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
   private final TableName tableName;
   private final DataJanitorState dataJanitorState;
   private final long pruneFlushInterval;
+  // Map of region name -> prune upper bound
   private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+  // Map of region name -> time the region was found to be empty
+  private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
 
   private long lastChecked;
 
+  @SuppressWarnings("WeakerAccess")
   public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
     this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
     this.pruneFlushInterval = pruneFlushInterval;
     this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+    this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
   }
 
+  @SuppressWarnings("WeakerAccess")
   public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    warnIfNotRunning(regionName);
     // The number of entries in this map is bound by the number of regions in this region server and thus it will not
     // grow indefinitely
     pruneEntries.put(regionName, pruneUpperBound);
   }
 
+  @SuppressWarnings("WeakerAccess")
+  public void persistRegionEmpty(byte[] regionName, long time) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    emptyRegions.put(regionName, time);
+  }
+
+  @SuppressWarnings("WeakerAccess")
   public boolean isAlive() {
     return flushThread != null && flushThread.isAlive();
   }
@@ -86,13 +102,22 @@
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              while (pruneEntries.firstEntry() != null) {
+              // Record prune upper bound
+              while (!pruneEntries.isEmpty()) {
                 Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
                 dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
                 // We can now remove the entry only if the key and value match with what we wrote since it is
                 // possible that a new pruneUpperBound for the same key has been added
                 pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+              // Record empty regions
+              while (!emptyRegions.isEmpty()) {
+                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new value for the same key has been added
+                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+              }
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
@@ -115,4 +140,11 @@
     flushThread.setDaemon(true);
     flushThread.start();
   }
+
+  private void warnIfNotRunning(byte[] regionName) {
+    if (!isRunning() || !isAlive()) {
+      LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+                             Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+    }
+  }
 }
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 3ae0423..14bf96c 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -161,6 +161,7 @@
     }
 
     // Verify saved regions
+    Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -168,20 +169,39 @@
                         dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
 
+    // Now change the count stored for regions saved at time 0 and 30
+    try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+    }
+    // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
     // Delete regions saved on or before time 30
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
     // Values on or before time 30 should be deleted
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+    // Counts should be deleted for time on or before 30
+    try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+    }
     // Values after time 30 should still exist
     Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+    try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+      Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+    }
   }
 
   @Test
   public void testSaveInactiveTransactionBoundTime() throws Exception {
     int maxTime = 100;
 
-    // Nothing sould be present in the beginning
+    // Nothing should be present in the beginning
     Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
 
     // Save inactive transaction bounds for various time values
@@ -207,4 +227,59 @@
     Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
     Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
   }
+
+  @Test
+  public void testSaveEmptyRegions() throws Exception {
+    // Nothing should be present in the beginning
+    Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    byte[] region1 = Bytes.toBytes("region1");
+    byte[] region2 = Bytes.toBytes("region2");
+    byte[] region3 = Bytes.toBytes("region3");
+    byte[] region4 = Bytes.toBytes("region4");
+    SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+    // Now record some empty regions
+    dataJanitorState.saveEmptyRegionForTime(100, region1);
+    dataJanitorState.saveEmptyRegionForTime(110, region1);
+    dataJanitorState.saveEmptyRegionForTime(102, region2);
+    dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    Assert.assertEquals(toISet(region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+    Assert.assertEquals(toISet(region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+    // Delete empty regions on or before time 110
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+    // Now only region3 should remain
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    // Delete empty regions on or before time 150
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+    // Now nothing should remain
+    Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+  }
+
+  private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+    ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+    for (byte[] arg : args) {
+      builder.add(arg);
+    }
+    return builder.build();
+  }
 }
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index fbd4d7d..e3f5c6b 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -74,8 +74,9 @@
 
   private static TableName txDataTable1;
   private static TableName pruneStateTable;
+  private static DataJanitorState dataJanitorState;
 
-  private HConnection connection;
+  private static HConnection connection;
 
   // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
   @BeforeClass
@@ -109,17 +110,25 @@
 
     pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                  TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    connection = HConnectionManager.createConnection(conf);
+    dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public HTableInterface get() throws IOException {
+          return connection.getTable(pruneStateTable);
+        }
+      });
   }
 
   @AfterClass
   public static void shutdownAfterClass() throws Exception {
+    connection.close();
     hBaseAdmin.disableTable(txDataTable1);
     hBaseAdmin.deleteTable(txDataTable1);
   }
 
   @Before
   public void beforeTest() throws Exception {
-    connection = HConnectionManager.createConnection(conf);
     createPruneStateTable();
     InMemoryTransactionStateCache.setTransactionSnapshot(null);
   }
@@ -133,8 +142,12 @@
 
   @After
   public void afterTest() throws Exception {
+    // Disable the data table so that prune writer thread gets stopped,
+    // this makes sure that any cached value will not interfere with next test
+    hBaseAdmin.disableTable(txDataTable1);
     deletePruneStateTable();
-    connection.close();
+    // Enabling the table enables the prune writer thread again
+    hBaseAdmin.enableTable(txDataTable1);
   }
 
   private void deletePruneStateTable() throws Exception {
@@ -144,34 +157,8 @@
     }
   }
 
-  private void truncatePruneStateTable() throws Exception {
-    if (hBaseAdmin.tableExists(pruneStateTable)) {
-      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
-        hBaseAdmin.disableTable(pruneStateTable);
-      }
-      HTableDescriptor htd = hBaseAdmin.getTableDescriptor(pruneStateTable);
-      hBaseAdmin.deleteTable(pruneStateTable);
-      hBaseAdmin.createTable(htd);
-    }
-  }
-
   @Test
   public void testRecordCompactionState() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public HTableInterface get() throws IOException {
-          return connection.getTable(pruneStateTable);
-        }
-      });
-
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -220,10 +207,6 @@
 
   @Test
   public void testRecordCompactionStateNoTable() throws Exception {
-    // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
-    // and make sure a major compaction succeeds
-    deletePruneStateTable();
-
     // Create a new transaction snapshot
     InMemoryTransactionStateCache.setTransactionSnapshot(
       new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -255,23 +238,9 @@
 
   @Test
   public void testPruneUpperBound() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public HTableInterface get() throws IOException {
-          return connection.getTable(pruneStateTable);
-        }
-      });
-
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
+
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -341,6 +310,87 @@
     }
   }
 
+  @Test
+  public void testPruneEmptyTable() throws Exception {
+    // Make sure that empty tables do not block the progress of pruning
+
+    // Create an empty table
+    TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+    HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+                                     Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+    TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+
+    try {
+      long now1 = System.currentTimeMillis();
+      long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+      long noPruneUpperBound = -1;
+      long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+                                ImmutableSet.of(expectedPruneUpperBound1),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.compact(txEmptyTable, true);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+      long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+      // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+      testUtil.flush(txEmptyTable);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, again, this time it should work
+      pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Now add some data to the empty table
+      // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+      emptyHTable.put(new Put(Bytes.toBytes(1)).add(family, qualifier, Bytes.toBytes(1)));
+      emptyHTable.close();
+
+      // Now run another compaction on txDataTable1 with an updated tx snapshot
+      long now2 = System.currentTimeMillis();
+      long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+      // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+      // empty in the previous run with inactiveTxTimeNow1
+      long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+      // However, after compacting txEmptyTable we should get the latest upper bound
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txEmptyTable, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+      pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+    } finally {
+      transactionPruningPlugin.destroy();
+      hBaseAdmin.disableTable(txEmptyTable);
+      hBaseAdmin.deleteTable(txEmptyTable);
+    }
+  }
+
   private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
     HRegionLocation regionLocation = connection.getRegionLocation(dataTable, row, true);
     return regionLocation.getRegionInfo().getRegionName();
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9e9dd46..7485b91 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -43,6 +43,7 @@
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -310,6 +311,28 @@
   }
 
   @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+    // Record whether the region is empty after a flush
+    HRegion region = e.getEnvironment().getRegion();
+    // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+    // then the region must be empty
+    long numStoreFiles = numStoreFilesForRegion(e);
+    long memstoreSize = region.getMemstoreSize().get();
+    LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+                            region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+    if (memstoreSize == 0 && numStoreFiles == 0) {
+      if (pruneEnable == null) {
+        initPruneState(e);
+      }
+
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        compactionState.persistRegionEmpty(System.currentTimeMillis());
+      }
+    }
+
+  }
+
+  @Override
   public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
       CompactionRequest request)
@@ -318,25 +341,7 @@
     TransactionVisibilityState snapshot = cache.getLatestState();
 
     if (pruneEnable == null) {
-      Configuration conf = getConfiguration(c.getEnvironment());
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        if (Boolean.TRUE.equals(pruneEnable)) {
-          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                        + pruneTable);
-          }
-        }
-      }
+      initPruneState(c);
     }
 
     if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
+  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+    Configuration conf = getConfiguration(c.getEnvironment());
+    // Configuration won't be null in TransactionProcessor but the derived classes might return
+    // null if it is not available temporarily
+    if (conf != null) {
+      pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                    TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
+      }
+    }
+  }
+
+  private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+    long numStoreFiles = 0;
+    for (Store store : c.getEnvironment().getRegion().getStores().values()) {
+      numStoreFiles += store.getStorefiles().size();
+    }
+    return numStoreFiles;
+  }
+
   /**
    * Filter used to include cells visible to in-progress transactions on flush and commit.
    */
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index c1f1825..7060244 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
   }
 
   /**
+   * Persist that the given region is empty at the given time
+   * @param time time in milliseconds
+   */
+  public void persistRegionEmpty(long time) {
+    pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+    }
+  }
+
+  /**
    * Releases the usage {@link PruneUpperBoundWriter}.
    */
   public void stop() {
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 979eb1a..4345fe6 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,11 +54,14 @@
  */
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
+  private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
   public static final byte[] FAMILY = {'f'};
   public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+  private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
 
   private static final byte[] REGION_KEY_PREFIX = {0x1};
   private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
 
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
   private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+  // This value can be used when we don't care about the value we write in a column
+  private static final byte[] COL_VAL = Bytes.toBytes('1');
 
   private final TableSupplier stateTableSupplier;
 
@@ -148,7 +163,7 @@
     for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
       resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
     }
-    return resultMap;
+    return Collections.unmodifiableMap(resultMap);
   }
 
   /**
@@ -181,7 +196,7 @@
         }
       }
     }
-    return regionPruneInfos;
+    return Collections.unmodifiableList(regionPruneInfos);
   }
 
   /**
@@ -223,7 +238,7 @@
   // ---------------------------------------------------
   // ------- Methods for regions at a given time -------
   // ---------------------------------------------------
-  // Key: 0x2<time><region-id>
+  // Key: 0x2<inverted time><region-id>
   // Col 't': <empty byte array>
   // ---------------------------------------------------
 
@@ -240,12 +255,22 @@
     try (HTableInterface stateTable = stateTableSupplier.get()) {
       for (byte[] region : regions) {
         Put put = new Put(makeTimeRegionKey(timeBytes, region));
-        put.add(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+        put.add(FAMILY, REGION_TIME_COL, COL_VAL);
         stateTable.put(put);
       }
+
+      // Save the count of regions as a checksum
+      saveRegionCountForTime(stateTable, timeBytes, regions.size());
     }
   }
 
+  @VisibleForTesting
+  void saveRegionCountForTime(HTableInterface stateTable, byte[] timeBytes, int count) throws IOException {
+    Put put = new Put(makeTimeRegionCountKey(timeBytes));
+    put.add(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+    stateTable.put(put);
+  }
+
   /**
    * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
    * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
    */
   @Nullable
   public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
-    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (HTableInterface stateTable = stateTableSupplier.get()) {
-      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, REGION_TIME_COL);
-
-      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-      long currentRegionTime = -1;
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
-          // Stop if reached next time value
-          if (currentRegionTime == -1) {
-            currentRegionTime = timeRegion.getKey();
-          } else if (timeRegion.getKey() < currentRegionTime) {
-            break;
-          } else if (timeRegion.getKey() > currentRegionTime) {
-            throw new IllegalStateException(
-              String.format("Got out of order time %d when expecting time less than or equal to %d",
-                            timeRegion.getKey(), currentRegionTime));
-          }
-          regions.add(timeRegion.getValue());
+      TimeRegions timeRegions;
+      while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+        int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+        if (count != -1 && count == timeRegions.getRegions().size()) {
+          return timeRegions;
+        } else {
+          LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+                                 timeRegions.getTime(), count, timeRegions.getRegions().size()));
+          time = time - 1;
         }
       }
-      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+      return null;
     }
   }
 
+  @Nullable
+  private TimeRegions getNextSetOfTimeRegions(HTableInterface stateTable, long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+    scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+    long currentRegionTime = -1;
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    Result next;
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      while ((next = scanner.next()) != null) {
+        Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+        // Stop if reached next time value
+        if (currentRegionTime == -1) {
+          currentRegionTime = timeRegion.getKey();
+        } else if (timeRegion.getKey() < currentRegionTime) {
+          break;
+        } else if (timeRegion.getKey() > currentRegionTime) {
+          throw new IllegalStateException(
+            String.format("Got out of order time %d when expecting time less than or equal to %d",
+                          timeRegion.getKey(), currentRegionTime));
+        }
+        regions.add(timeRegion.getValue());
+      }
+    }
+    return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+  }
+
+  @VisibleForTesting
+  int getRegionCountForTime(HTableInterface stateTable, long time) throws IOException {
+    Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+    get.addColumn(FAMILY, REGION_TIME_COL);
+    Result result = stateTable.get(get);
+    byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+    return value == null ? -1 : Bytes.toInt(value);
+  }
+
   /**
    * Delete all the regions that were recorded for all times equal or less than the given time.
    *
@@ -294,15 +345,15 @@
   public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
     byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (HTableInterface stateTable = stateTableSupplier.get()) {
+      // Delete the regions
       Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
 
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
-        }
-      }
+      // Delete the count
+      scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
     }
   }
 
@@ -356,14 +407,82 @@
       Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
                            INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  // --------------------------------------------------------
+  // ------- Methods for empty regions at a given time -------
+  // --------------------------------------------------------
+  // Key: 0x4<time><region-id>
+  // Col 'e': <empty byte array>
+  // --------------------------------------------------------
+
+  /**
+   * Save the given region as empty as of the given time.
+   *
+   * @param time time in milliseconds
+   * @param regionId region id
+   */
+  public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(time);
+    try (HTableInterface stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+      put.add(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Return regions that were recorded as empty after the given time.
+   *
+   * @param time time in milliseconds
+   * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+   *                       and the empty regions after the given time
+   */
+  public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+    throws IOException {
+    SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (HTableInterface stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+                           EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
 
       try (ResultScanner scanner = stateTable.getScanner(scan)) {
         Result next;
         while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
+          byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+          if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+            emptyRegions.add(emptyRegion);
+          }
         }
       }
     }
+    return Collections.unmodifiableSortedSet(emptyRegions);
+  }
+
+  /**
+   * Delete empty region records saved on or before the given time.
+   *
+   * @param time time in milliseconds
+   */
+  public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+    try (HTableInterface stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan();
+      scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  @VisibleForTesting
+  void deleteFromScan(HTableInterface stateTable, Scan scan) throws IOException {
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      Result next;
+      while ((next = scanner.next()) != null) {
+        stateTable.delete(new Delete(next.getRow()));
+      }
+    }
   }
 
   private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
     return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
   }
 
+  private byte[] makeTimeRegionCountKey(byte[] time) {
+    return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+  }
+
   private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
     return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
   }
@@ -391,6 +514,15 @@
     return Maps.immutableEntry(time, regionName);
   }
 
+  private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+    return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+  }
+
+  private byte[] getEmptyRegionFromKey(byte[] key) {
+    int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+    return Bytes.copy(key, prefixLen, key.length - prefixLen);
+  }
+
   private long getInvertedTime(long time) {
     return Long.MAX_VALUE - time;
   }
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 80da8d8..021f1b2 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 /**
@@ -203,6 +204,8 @@
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
     LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
     dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
   }
 
   @Override
@@ -295,26 +298,40 @@
       SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
       long time = timeRegions.getTime();
 
-      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+      if (inactiveTransactionBound == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+                      "and hence the data must be incomplete", time);
+        }
+        continue;
+      }
+
+      // Get the prune upper bounds for all the transactional regions
+      Map<byte[], Long> pruneUpperBoundRegions =
+        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
       logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+      // for transactions started on or before inactiveTransactionBoundTime
+      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+                                                  pruneUpperBoundRegions);
+
       // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
       // across all regions
-      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
-        long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
-        LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
-        // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
-        if (inactiveTransactionBound != -1) {
-          Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
-          return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
-                        "and hence the data must be incomplete", time);
-          }
-        }
+      if (!transactionalRegions.isEmpty() &&
+        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+        return pruneUpperBound;
       } else {
         if (LOG.isDebugEnabled()) {
-          Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+          Sets.SetView<byte[]> difference =
+            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
           LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
                     time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
         }
@@ -325,6 +342,28 @@
     return -1;
   }
 
+  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+                                               SortedSet<byte[]> transactionalRegions,
+                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+    SortedSet<byte[]> emptyRegions =
+      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+    // for these empty regions as inactiveTransactionBound
+    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+    for (byte[] emptyRegion : emptyRegions) {
+      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+      }
+    }
+    return Collections.unmodifiableMap(pubWithEmptyRegions);
+  }
+
   private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 5a86b4a..beed1ad 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
   private final TableName tableName;
   private final DataJanitorState dataJanitorState;
   private final long pruneFlushInterval;
+  // Map of region name -> prune upper bound
   private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+  // Map of region name -> time the region was found to be empty
+  private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
 
   private long lastChecked;
 
+  @SuppressWarnings("WeakerAccess")
   public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
     this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
     this.pruneFlushInterval = pruneFlushInterval;
     this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+    this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
   }
 
+  @SuppressWarnings("WeakerAccess")
   public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    warnIfNotRunning(regionName);
     // The number of entries in this map is bound by the number of regions in this region server and thus it will not
     // grow indefinitely
     pruneEntries.put(regionName, pruneUpperBound);
   }
 
+  @SuppressWarnings("WeakerAccess")
+  public void persistRegionEmpty(byte[] regionName, long time) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    emptyRegions.put(regionName, time);
+  }
+
+  @SuppressWarnings("WeakerAccess")
   public boolean isAlive() {
     return flushThread != null && flushThread.isAlive();
   }
@@ -86,13 +102,22 @@
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              while (pruneEntries.firstEntry() != null) {
+              // Record prune upper bound
+              while (!pruneEntries.isEmpty()) {
                 Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
                 dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
                 // We can now remove the entry only if the key and value match with what we wrote since it is
                 // possible that a new pruneUpperBound for the same key has been added
                 pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+              // Record empty regions
+              while (!emptyRegions.isEmpty()) {
+                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new value for the same key has been added
+                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+              }
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
@@ -115,4 +140,11 @@
     flushThread.setDaemon(true);
     flushThread.start();
   }
+
+  private void warnIfNotRunning(byte[] regionName) {
+    if (!isRunning() || !isAlive()) {
+      LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+                             Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+    }
+  }
 }
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 3ae0423..14bf96c 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -161,6 +161,7 @@
     }
 
     // Verify saved regions
+    Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -168,20 +169,39 @@
                         dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
 
+    // Now change the count stored for regions saved at time 0 and 30
+    try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+    }
+    // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
     // Delete regions saved on or before time 30
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
     // Values on or before time 30 should be deleted
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+    // Counts should be deleted for time on or before 30
+    try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+    }
     // Values after time 30 should still exist
     Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+    try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+      Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+    }
   }
 
   @Test
   public void testSaveInactiveTransactionBoundTime() throws Exception {
     int maxTime = 100;
 
-    // Nothing sould be present in the beginning
+    // Nothing should be present in the beginning
     Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
 
     // Save inactive transaction bounds for various time values
@@ -207,4 +227,59 @@
     Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
     Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
   }
+
+  @Test
+  public void testSaveEmptyRegions() throws Exception {
+    // Nothing should be present in the beginning
+    Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    byte[] region1 = Bytes.toBytes("region1");
+    byte[] region2 = Bytes.toBytes("region2");
+    byte[] region3 = Bytes.toBytes("region3");
+    byte[] region4 = Bytes.toBytes("region4");
+    SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+    // Now record some empty regions
+    dataJanitorState.saveEmptyRegionForTime(100, region1);
+    dataJanitorState.saveEmptyRegionForTime(110, region1);
+    dataJanitorState.saveEmptyRegionForTime(102, region2);
+    dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    Assert.assertEquals(toISet(region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+    Assert.assertEquals(toISet(region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+    // Delete empty regions on or before time 110
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+    // Now only region3 should remain
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    // Delete empty regions on or before time 150
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+    // Now nothing should remain
+    Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+  }
+
+  private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+    ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+    for (byte[] arg : args) {
+      builder.add(arg);
+    }
+    return builder.build();
+  }
 }
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 37f732c..e3f5c6b 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -74,8 +74,9 @@
 
   private static TableName txDataTable1;
   private static TableName pruneStateTable;
+  private static DataJanitorState dataJanitorState;
 
-  private HConnection connection;
+  private static HConnection connection;
 
   // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
   @BeforeClass
@@ -109,17 +110,25 @@
 
     pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                  TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    connection = HConnectionManager.createConnection(conf);
+    dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public HTableInterface get() throws IOException {
+          return connection.getTable(pruneStateTable);
+        }
+      });
   }
 
   @AfterClass
   public static void shutdownAfterClass() throws Exception {
+    connection.close();
     hBaseAdmin.disableTable(txDataTable1);
     hBaseAdmin.deleteTable(txDataTable1);
   }
 
   @Before
   public void beforeTest() throws Exception {
-    connection = HConnectionManager.createConnection(conf);
     createPruneStateTable();
     InMemoryTransactionStateCache.setTransactionSnapshot(null);
   }
@@ -133,8 +142,12 @@
 
   @After
   public void afterTest() throws Exception {
+    // Disable the data table so that prune writer thread gets stopped,
+    // this makes sure that any cached value will not interfere with next test
+    hBaseAdmin.disableTable(txDataTable1);
     deletePruneStateTable();
-    connection.close();
+    // Enabling the table enables the prune writer thread again
+    hBaseAdmin.enableTable(txDataTable1);
   }
 
   private void deletePruneStateTable() throws Exception {
@@ -144,32 +157,8 @@
     }
   }
 
-  private void truncatePruneStateTable() throws Exception {
-    if (hBaseAdmin.tableExists(pruneStateTable)) {
-      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
-        hBaseAdmin.disableTable(pruneStateTable);
-      }
-      hBaseAdmin.truncateTable(pruneStateTable, true);
-    }
-  }
-
   @Test
   public void testRecordCompactionState() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public HTableInterface get() throws IOException {
-          return connection.getTable(pruneStateTable);
-        }
-      });
-
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -218,10 +207,6 @@
 
   @Test
   public void testRecordCompactionStateNoTable() throws Exception {
-    // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
-    // and make sure a major compaction succeeds
-    deletePruneStateTable();
-
     // Create a new transaction snapshot
     InMemoryTransactionStateCache.setTransactionSnapshot(
       new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -253,24 +238,9 @@
 
   @Test
   public void testPruneUpperBound() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public HTableInterface get() throws IOException {
-          return connection.getTable(pruneStateTable);
-        }
-      });
-
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
 
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -340,6 +310,87 @@
     }
   }
 
+  @Test
+  public void testPruneEmptyTable() throws Exception {
+    // Make sure that empty tables do not block the progress of pruning
+
+    // Create an empty table
+    TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+    HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+                                     Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+    TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+
+    try {
+      long now1 = System.currentTimeMillis();
+      long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+      long noPruneUpperBound = -1;
+      long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+                                ImmutableSet.of(expectedPruneUpperBound1),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.compact(txEmptyTable, true);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+      long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+      // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+      testUtil.flush(txEmptyTable);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, again, this time it should work
+      pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Now add some data to the empty table
+      // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+      emptyHTable.put(new Put(Bytes.toBytes(1)).add(family, qualifier, Bytes.toBytes(1)));
+      emptyHTable.close();
+
+      // Now run another compaction on txDataTable1 with an updated tx snapshot
+      long now2 = System.currentTimeMillis();
+      long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+      // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+      // empty in the previous run with inactiveTxTimeNow1
+      long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+      // However, after compacting txEmptyTable we should get the latest upper bound
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txEmptyTable, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+      pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+    } finally {
+      transactionPruningPlugin.destroy();
+      hBaseAdmin.disableTable(txEmptyTable);
+      hBaseAdmin.deleteTable(txEmptyTable);
+    }
+  }
+
   private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
     HRegionLocation regionLocation = connection.getRegionLocation(dataTable, row, true);
     return regionLocation.getRegionInfo().getRegionName();
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9e9dd46..7485b91 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -43,6 +43,7 @@
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -310,6 +311,28 @@
   }
 
   @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+    // Record whether the region is empty after a flush
+    HRegion region = e.getEnvironment().getRegion();
+    // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+    // then the region must be empty
+    long numStoreFiles = numStoreFilesForRegion(e);
+    long memstoreSize = region.getMemstoreSize().get();
+    LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+                            region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+    if (memstoreSize == 0 && numStoreFiles == 0) {
+      if (pruneEnable == null) {
+        initPruneState(e);
+      }
+
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        compactionState.persistRegionEmpty(System.currentTimeMillis());
+      }
+    }
+
+  }
+
+  @Override
   public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
       CompactionRequest request)
@@ -318,25 +341,7 @@
     TransactionVisibilityState snapshot = cache.getLatestState();
 
     if (pruneEnable == null) {
-      Configuration conf = getConfiguration(c.getEnvironment());
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        if (Boolean.TRUE.equals(pruneEnable)) {
-          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                        + pruneTable);
-          }
-        }
-      }
+      initPruneState(c);
     }
 
     if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
+  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+    Configuration conf = getConfiguration(c.getEnvironment());
+    // Configuration won't be null in TransactionProcessor but the derived classes might return
+    // null if it is not available temporarily
+    if (conf != null) {
+      pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                    TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
+      }
+    }
+  }
+
+  private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+    long numStoreFiles = 0;
+    for (Store store : c.getEnvironment().getRegion().getStores().values()) {
+      numStoreFiles += store.getStorefiles().size();
+    }
+    return numStoreFiles;
+  }
+
   /**
    * Filter used to include cells visible to in-progress transactions on flush and commit.
    */
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index db7880b..9b856d9 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
   }
 
   /**
+   * Persist that the given region is empty at the given time
+   * @param time time in milliseconds
+   */
+  public void persistRegionEmpty(long time) {
+    pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+    }
+  }
+
+  /**
    * Releases the usage {@link PruneUpperBoundWriter}.
    */
   public void stop() {
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 897e00e..fc0ec76 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,11 +54,14 @@
  */
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
+  private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
   public static final byte[] FAMILY = {'f'};
   public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+  private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
 
   private static final byte[] REGION_KEY_PREFIX = {0x1};
   private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
 
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
   private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+  // This value can be used when we don't care about the value we write in a column
+  private static final byte[] COL_VAL = Bytes.toBytes('1');
 
   private final TableSupplier stateTableSupplier;
 
@@ -148,7 +163,7 @@
     for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
       resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
     }
-    return resultMap;
+    return Collections.unmodifiableMap(resultMap);
   }
 
   /**
@@ -181,7 +196,7 @@
         }
       }
     }
-    return regionPruneInfos;
+    return Collections.unmodifiableList(regionPruneInfos);
   }
 
   /**
@@ -223,7 +238,7 @@
   // ---------------------------------------------------
   // ------- Methods for regions at a given time -------
   // ---------------------------------------------------
-  // Key: 0x2<time><region-id>
+  // Key: 0x2<inverted time><region-id>
   // Col 't': <empty byte array>
   // ---------------------------------------------------
 
@@ -240,12 +255,22 @@
     try (Table stateTable = stateTableSupplier.get()) {
       for (byte[] region : regions) {
         Put put = new Put(makeTimeRegionKey(timeBytes, region));
-        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+        put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL);
         stateTable.put(put);
       }
+
+      // Save the count of regions as a checksum
+      saveRegionCountForTime(stateTable, timeBytes, regions.size());
     }
   }
 
+  @VisibleForTesting
+  void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException {
+    Put put = new Put(makeTimeRegionCountKey(timeBytes));
+    put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+    stateTable.put(put);
+  }
+
   /**
    * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
    * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
    */
   @Nullable
   public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
-    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (Table stateTable = stateTableSupplier.get()) {
-      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, REGION_TIME_COL);
-
-      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-      long currentRegionTime = -1;
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
-          // Stop if reached next time value
-          if (currentRegionTime == -1) {
-            currentRegionTime = timeRegion.getKey();
-          } else if (timeRegion.getKey() < currentRegionTime) {
-            break;
-          } else if (timeRegion.getKey() > currentRegionTime) {
-            throw new IllegalStateException(
-              String.format("Got out of order time %d when expecting time less than or equal to %d",
-                            timeRegion.getKey(), currentRegionTime));
-          }
-          regions.add(timeRegion.getValue());
+      TimeRegions timeRegions;
+      while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+        int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+        if (count != -1 && count == timeRegions.getRegions().size()) {
+          return timeRegions;
+        } else {
+          LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+                                 timeRegions.getTime(), count, timeRegions.getRegions().size()));
+          time = time - 1;
         }
       }
-      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+      return null;
     }
   }
 
+  @Nullable
+  private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+    scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+    long currentRegionTime = -1;
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    Result next;
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      while ((next = scanner.next()) != null) {
+        Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+        // Stop if reached next time value
+        if (currentRegionTime == -1) {
+          currentRegionTime = timeRegion.getKey();
+        } else if (timeRegion.getKey() < currentRegionTime) {
+          break;
+        } else if (timeRegion.getKey() > currentRegionTime) {
+          throw new IllegalStateException(
+            String.format("Got out of order time %d when expecting time less than or equal to %d",
+                          timeRegion.getKey(), currentRegionTime));
+        }
+        regions.add(timeRegion.getValue());
+      }
+    }
+    return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+  }
+
+  @VisibleForTesting
+  int getRegionCountForTime(Table stateTable, long time) throws IOException {
+    Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+    get.addColumn(FAMILY, REGION_TIME_COL);
+    Result result = stateTable.get(get);
+    byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+    return value == null ? -1 : Bytes.toInt(value);
+  }
+
   /**
    * Delete all the regions that were recorded for all times equal or less than the given time.
    *
@@ -294,15 +345,15 @@
   public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
     byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (Table stateTable = stateTableSupplier.get()) {
+      // Delete the regions
       Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
 
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
-        }
-      }
+      // Delete the count
+      scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
     }
   }
 
@@ -356,14 +407,82 @@
       Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
                            INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  // --------------------------------------------------------
+  // ------- Methods for empty regions at a given time -------
+  // --------------------------------------------------------
+  // Key: 0x4<time><region-id>
+  // Col 'e': <empty byte array>
+  // --------------------------------------------------------
+
+  /**
+   * Save the given region as empty as of the given time.
+   *
+   * @param time time in milliseconds
+   * @param regionId region id
+   */
+  public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(time);
+    try (Table stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+      put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Return regions that were recorded as empty after the given time.
+   *
+   * @param time time in milliseconds
+   * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+   *                       and the empty regions after the given time
+   */
+  public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+    throws IOException {
+    SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+                           EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
 
       try (ResultScanner scanner = stateTable.getScanner(scan)) {
         Result next;
         while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
+          byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+          if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+            emptyRegions.add(emptyRegion);
+          }
         }
       }
     }
+    return Collections.unmodifiableSortedSet(emptyRegions);
+  }
+
+  /**
+   * Delete empty region records saved on or before the given time.
+   *
+   * @param time time in milliseconds
+   */
+  public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan();
+      scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  @VisibleForTesting
+  void deleteFromScan(Table stateTable, Scan scan) throws IOException {
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      Result next;
+      while ((next = scanner.next()) != null) {
+        stateTable.delete(new Delete(next.getRow()));
+      }
+    }
   }
 
   private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
     return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
   }
 
+  private byte[] makeTimeRegionCountKey(byte[] time) {
+    return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+  }
+
   private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
     return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
   }
@@ -391,6 +514,15 @@
     return Maps.immutableEntry(time, regionName);
   }
 
+  private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+    return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+  }
+
+  private byte[] getEmptyRegionFromKey(byte[] key) {
+    int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+    return Bytes.copy(key, prefixLen, key.length - prefixLen);
+  }
+
   private long getInvertedTime(long time) {
     return Long.MAX_VALUE - time;
   }
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 95216b9..84c480a 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 /**
@@ -200,6 +201,8 @@
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
     LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
     dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
   }
 
   @Override
@@ -288,26 +291,40 @@
       SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
       long time = timeRegions.getTime();
 
-      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+      if (inactiveTransactionBound == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+                      "and hence the data must be incomplete", time);
+        }
+        continue;
+      }
+
+      // Get the prune upper bounds for all the transactional regions
+      Map<byte[], Long> pruneUpperBoundRegions =
+        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
       logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+      // for transactions started on or before inactiveTransactionBoundTime
+      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+                                                  pruneUpperBoundRegions);
+
       // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
       // across all regions
-      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
-        long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
-        LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
-        // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
-        if (inactiveTransactionBound != -1) {
-          Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
-          return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
-                        "and hence the data must be incomplete", time);
-          }
-        }
+      if (!transactionalRegions.isEmpty() &&
+        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+        return pruneUpperBound;
       } else {
         if (LOG.isDebugEnabled()) {
-          Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+          Sets.SetView<byte[]> difference =
+            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
           LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
                     time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
         }
@@ -318,6 +335,28 @@
     return -1;
   }
 
+  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+                                               SortedSet<byte[]> transactionalRegions,
+                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+    SortedSet<byte[]> emptyRegions =
+      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+    // for these empty regions as inactiveTransactionBound
+    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+    for (byte[] emptyRegion : emptyRegions) {
+      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+      }
+    }
+    return Collections.unmodifiableMap(pubWithEmptyRegions);
+  }
+
   private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 7e9d1a3..9773a15 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
   private final TableName tableName;
   private final DataJanitorState dataJanitorState;
   private final long pruneFlushInterval;
+  // Map of region name -> prune upper bound
   private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+  // Map of region name -> time the region was found to be empty
+  private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
 
   private long lastChecked;
 
+  @SuppressWarnings("WeakerAccess")
   public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
     this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
     this.pruneFlushInterval = pruneFlushInterval;
     this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+    this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
   }
 
+  @SuppressWarnings("WeakerAccess")
   public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    warnIfNotRunning(regionName);
     // The number of entries in this map is bound by the number of regions in this region server and thus it will not
     // grow indefinitely
     pruneEntries.put(regionName, pruneUpperBound);
   }
 
+  @SuppressWarnings("WeakerAccess")
+  public void persistRegionEmpty(byte[] regionName, long time) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    emptyRegions.put(regionName, time);
+  }
+
+  @SuppressWarnings("WeakerAccess")
   public boolean isAlive() {
     return flushThread != null && flushThread.isAlive();
   }
@@ -86,13 +102,22 @@
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              while (pruneEntries.firstEntry() != null) {
+              // Record prune upper bound
+              while (!pruneEntries.isEmpty()) {
                 Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
                 dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
                 // We can now remove the entry only if the key and value match with what we wrote since it is
                 // possible that a new pruneUpperBound for the same key has been added
                 pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+              // Record empty regions
+              while (!emptyRegions.isEmpty()) {
+                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new value for the same key has been added
+                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+              }
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNameWithNamespaceInclAsString(), ex);
@@ -115,4 +140,11 @@
     flushThread.setDaemon(true);
     flushThread.start();
   }
+
+  private void warnIfNotRunning(byte[] regionName) {
+    if (!isRunning() || !isAlive()) {
+      LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+                             Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+    }
+  }
 }
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 402892f..b96d87d 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -156,6 +156,7 @@
     }
 
     // Verify saved regions
+    Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -163,20 +164,39 @@
                         dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
 
+    // Now change the count stored for regions saved at time 0 and 30
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+    }
+    // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
     // Delete regions saved on or before time 30
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
     // Values on or before time 30 should be deleted
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+    // Counts should be deleted for time on or before 30
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+    }
     // Values after time 30 should still exist
     Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+    }
   }
 
   @Test
   public void testSaveInactiveTransactionBoundTime() throws Exception {
     int maxTime = 100;
 
-    // Nothing sould be present in the beginning
+    // Nothing should be present in the beginning
     Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
 
     // Save inactive transaction bounds for various time values
@@ -202,4 +222,59 @@
     Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
     Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
   }
+
+  @Test
+  public void testSaveEmptyRegions() throws Exception {
+    // Nothing should be present in the beginning
+    Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    byte[] region1 = Bytes.toBytes("region1");
+    byte[] region2 = Bytes.toBytes("region2");
+    byte[] region3 = Bytes.toBytes("region3");
+    byte[] region4 = Bytes.toBytes("region4");
+    SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+    // Now record some empty regions
+    dataJanitorState.saveEmptyRegionForTime(100, region1);
+    dataJanitorState.saveEmptyRegionForTime(110, region1);
+    dataJanitorState.saveEmptyRegionForTime(102, region2);
+    dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    Assert.assertEquals(toISet(region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+    Assert.assertEquals(toISet(region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+    // Delete empty regions on or before time 110
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+    // Now only region3 should remain
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    // Delete empty regions on or before time 150
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+    // Now nothing should remain
+    Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+  }
+
+  private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+    ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+    for (byte[] arg : args) {
+      builder.add(arg);
+    }
+    return builder.build();
+  }
 }
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index a431ee3..c99904b 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -72,6 +72,7 @@
 
   private static TableName txDataTable1;
   private static TableName pruneStateTable;
+  private static DataJanitorState dataJanitorState;
 
   // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
   @BeforeClass
@@ -105,6 +106,14 @@
 
     pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                  TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
   }
 
   @AfterClass
@@ -128,7 +137,12 @@
 
   @After
   public void afterTest() throws Exception {
+    // Disable the data table so that prune writer thread gets stopped,
+    // this makes sure that any cached value will not interfere with next test
+    hBaseAdmin.disableTable(txDataTable1);
     deletePruneStateTable();
+    // Enabling the table enables the prune writer thread again
+    hBaseAdmin.enableTable(txDataTable1);
   }
 
   private void deletePruneStateTable() throws Exception {
@@ -138,32 +152,8 @@
     }
   }
 
-  private void truncatePruneStateTable() throws Exception {
-    if (hBaseAdmin.tableExists(pruneStateTable)) {
-      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
-        hBaseAdmin.disableTable(pruneStateTable);
-      }
-      hBaseAdmin.truncateTable(pruneStateTable, true);
-    }
-  }
-
   @Test
   public void testRecordCompactionState() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public Table get() throws IOException {
-          return testUtil.getConnection().getTable(pruneStateTable);
-        }
-      });
-
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -212,10 +202,6 @@
 
   @Test
   public void testRecordCompactionStateNoTable() throws Exception {
-    // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
-    // and make sure a major compaction succeeds
-    deletePruneStateTable();
-
     // Create a new transaction snapshot
     InMemoryTransactionStateCache.setTransactionSnapshot(
       new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -247,24 +233,9 @@
 
   @Test
   public void testPruneUpperBound() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public Table get() throws IOException {
-          return testUtil.getConnection().getTable(pruneStateTable);
-        }
-      });
-
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
 
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -334,6 +305,87 @@
     }
   }
 
+  @Test
+  public void testPruneEmptyTable() throws Exception {
+    // Make sure that empty tables do not block the progress of pruning
+
+    // Create an empty table
+    TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+    HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+                                     Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+    TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+
+    try {
+      long now1 = System.currentTimeMillis();
+      long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+      long noPruneUpperBound = -1;
+      long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+                                ImmutableSet.of(expectedPruneUpperBound1),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.compact(txEmptyTable, true);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+      long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+      // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+      hBaseAdmin.flush(txEmptyTable);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, again, this time it should work
+      pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Now add some data to the empty table
+      // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+      emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
+      emptyHTable.close();
+
+      // Now run another compaction on txDataTable1 with an updated tx snapshot
+      long now2 = System.currentTimeMillis();
+      long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+      // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+      // empty in the previous run with inactiveTxTimeNow1
+      long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+      // However, after compacting txEmptyTable we should get the latest upper bound
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txEmptyTable, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+      pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+    } finally {
+      transactionPruningPlugin.destroy();
+      hBaseAdmin.disableTable(txEmptyTable);
+      hBaseAdmin.deleteTable(txEmptyTable);
+    }
+  }
+
   private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
     HRegionLocation regionLocation =
       testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9e9dd46..7485b91 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -43,6 +43,7 @@
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -310,6 +311,28 @@
   }
 
   @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+    // Record whether the region is empty after a flush
+    HRegion region = e.getEnvironment().getRegion();
+    // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+    // then the region must be empty
+    long numStoreFiles = numStoreFilesForRegion(e);
+    long memstoreSize = region.getMemstoreSize().get();
+    LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+                            region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+    if (memstoreSize == 0 && numStoreFiles == 0) {
+      if (pruneEnable == null) {
+        initPruneState(e);
+      }
+
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        compactionState.persistRegionEmpty(System.currentTimeMillis());
+      }
+    }
+
+  }
+
+  @Override
   public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
       CompactionRequest request)
@@ -318,25 +341,7 @@
     TransactionVisibilityState snapshot = cache.getLatestState();
 
     if (pruneEnable == null) {
-      Configuration conf = getConfiguration(c.getEnvironment());
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        if (Boolean.TRUE.equals(pruneEnable)) {
-          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                        + pruneTable);
-          }
-        }
-      }
+      initPruneState(c);
     }
 
     if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
+  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+    Configuration conf = getConfiguration(c.getEnvironment());
+    // Configuration won't be null in TransactionProcessor but the derived classes might return
+    // null if it is not available temporarily
+    if (conf != null) {
+      pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                    TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
+      }
+    }
+  }
+
+  private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+    long numStoreFiles = 0;
+    for (Store store : c.getEnvironment().getRegion().getStores().values()) {
+      numStoreFiles += store.getStorefiles().size();
+    }
+    return numStoreFiles;
+  }
+
   /**
    * Filter used to include cells visible to in-progress transactions on flush and commit.
    */
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index db7880b..9b856d9 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
   }
 
   /**
+   * Persist that the given region is empty at the given time
+   * @param time time in milliseconds
+   */
+  public void persistRegionEmpty(long time) {
+    pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+    }
+  }
+
+  /**
    * Releases the usage {@link PruneUpperBoundWriter}.
    */
   public void stop() {
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 897e00e..fc0ec76 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,11 +54,14 @@
  */
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
+  private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
   public static final byte[] FAMILY = {'f'};
   public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+  private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
 
   private static final byte[] REGION_KEY_PREFIX = {0x1};
   private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
 
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
   private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+  // This value can be used when we don't care about the value we write in a column
+  private static final byte[] COL_VAL = Bytes.toBytes('1');
 
   private final TableSupplier stateTableSupplier;
 
@@ -148,7 +163,7 @@
     for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
       resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
     }
-    return resultMap;
+    return Collections.unmodifiableMap(resultMap);
   }
 
   /**
@@ -181,7 +196,7 @@
         }
       }
     }
-    return regionPruneInfos;
+    return Collections.unmodifiableList(regionPruneInfos);
   }
 
   /**
@@ -223,7 +238,7 @@
   // ---------------------------------------------------
   // ------- Methods for regions at a given time -------
   // ---------------------------------------------------
-  // Key: 0x2<time><region-id>
+  // Key: 0x2<inverted time><region-id>
   // Col 't': <empty byte array>
   // ---------------------------------------------------
 
@@ -240,12 +255,22 @@
     try (Table stateTable = stateTableSupplier.get()) {
       for (byte[] region : regions) {
         Put put = new Put(makeTimeRegionKey(timeBytes, region));
-        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+        put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL);
         stateTable.put(put);
       }
+
+      // Save the count of regions as a checksum
+      saveRegionCountForTime(stateTable, timeBytes, regions.size());
     }
   }
 
+  @VisibleForTesting
+  void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException {
+    Put put = new Put(makeTimeRegionCountKey(timeBytes));
+    put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+    stateTable.put(put);
+  }
+
   /**
    * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
    * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
    */
   @Nullable
   public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
-    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (Table stateTable = stateTableSupplier.get()) {
-      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, REGION_TIME_COL);
-
-      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-      long currentRegionTime = -1;
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
-          // Stop if reached next time value
-          if (currentRegionTime == -1) {
-            currentRegionTime = timeRegion.getKey();
-          } else if (timeRegion.getKey() < currentRegionTime) {
-            break;
-          } else if (timeRegion.getKey() > currentRegionTime) {
-            throw new IllegalStateException(
-              String.format("Got out of order time %d when expecting time less than or equal to %d",
-                            timeRegion.getKey(), currentRegionTime));
-          }
-          regions.add(timeRegion.getValue());
+      TimeRegions timeRegions;
+      while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+        int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+        if (count != -1 && count == timeRegions.getRegions().size()) {
+          return timeRegions;
+        } else {
+          LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+                                 timeRegions.getTime(), count, timeRegions.getRegions().size()));
+          time = time - 1;
         }
       }
-      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+      return null;
     }
   }
 
+  @Nullable
+  private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+    scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+    long currentRegionTime = -1;
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    Result next;
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      while ((next = scanner.next()) != null) {
+        Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+        // Stop if reached next time value
+        if (currentRegionTime == -1) {
+          currentRegionTime = timeRegion.getKey();
+        } else if (timeRegion.getKey() < currentRegionTime) {
+          break;
+        } else if (timeRegion.getKey() > currentRegionTime) {
+          throw new IllegalStateException(
+            String.format("Got out of order time %d when expecting time less than or equal to %d",
+                          timeRegion.getKey(), currentRegionTime));
+        }
+        regions.add(timeRegion.getValue());
+      }
+    }
+    return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+  }
+
+  @VisibleForTesting
+  int getRegionCountForTime(Table stateTable, long time) throws IOException {
+    Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+    get.addColumn(FAMILY, REGION_TIME_COL);
+    Result result = stateTable.get(get);
+    byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+    return value == null ? -1 : Bytes.toInt(value);
+  }
+
   /**
    * Delete all the regions that were recorded for all times equal or less than the given time.
    *
@@ -294,15 +345,15 @@
   public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
     byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (Table stateTable = stateTableSupplier.get()) {
+      // Delete the regions
       Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
 
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
-        }
-      }
+      // Delete the count
+      scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
     }
   }
 
@@ -356,14 +407,82 @@
       Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
                            INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  // --------------------------------------------------------
+  // ------- Methods for empty regions at a given time -------
+  // --------------------------------------------------------
+  // Key: 0x4<time><region-id>
+  // Col 'e': <empty byte array>
+  // --------------------------------------------------------
+
+  /**
+   * Save the given region as empty as of the given time.
+   *
+   * @param time time in milliseconds
+   * @param regionId region id
+   */
+  public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(time);
+    try (Table stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+      put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Return regions that were recorded as empty after the given time.
+   *
+   * @param time time in milliseconds
+   * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+   *                       and the empty regions after the given time
+   */
+  public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+    throws IOException {
+    SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+                           EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
 
       try (ResultScanner scanner = stateTable.getScanner(scan)) {
         Result next;
         while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
+          byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+          if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+            emptyRegions.add(emptyRegion);
+          }
         }
       }
     }
+    return Collections.unmodifiableSortedSet(emptyRegions);
+  }
+
+  /**
+   * Delete empty region records saved on or before the given time.
+   *
+   * @param time time in milliseconds
+   */
+  public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan();
+      scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  @VisibleForTesting
+  void deleteFromScan(Table stateTable, Scan scan) throws IOException {
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      Result next;
+      while ((next = scanner.next()) != null) {
+        stateTable.delete(new Delete(next.getRow()));
+      }
+    }
   }
 
   private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
     return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
   }
 
+  private byte[] makeTimeRegionCountKey(byte[] time) {
+    return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+  }
+
   private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
     return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
   }
@@ -391,6 +514,15 @@
     return Maps.immutableEntry(time, regionName);
   }
 
+  private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+    return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+  }
+
+  private byte[] getEmptyRegionFromKey(byte[] key) {
+    int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+    return Bytes.copy(key, prefixLen, key.length - prefixLen);
+  }
+
   private long getInvertedTime(long time) {
     return Long.MAX_VALUE - time;
   }
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index a63cf75..8142601 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 /**
@@ -201,6 +202,8 @@
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
     LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
     dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
   }
 
   @Override
@@ -289,26 +292,40 @@
       SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
       long time = timeRegions.getTime();
 
-      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+      if (inactiveTransactionBound == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+                      "and hence the data must be incomplete", time);
+        }
+        continue;
+      }
+
+      // Get the prune upper bounds for all the transactional regions
+      Map<byte[], Long> pruneUpperBoundRegions =
+        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
       logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+      // for transactions started on or before inactiveTransactionBoundTime
+      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+                                                  pruneUpperBoundRegions);
+
       // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
       // across all regions
-      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
-        long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
-        LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
-        // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
-        if (inactiveTransactionBound != -1) {
-          Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
-          return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
-                        "and hence the data must be incomplete", time);
-          }
-        }
+      if (!transactionalRegions.isEmpty() &&
+        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+        return pruneUpperBound;
       } else {
         if (LOG.isDebugEnabled()) {
-          Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+          Sets.SetView<byte[]> difference =
+            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
           LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
                     time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
         }
@@ -319,6 +336,28 @@
     return -1;
   }
 
+  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+                                               SortedSet<byte[]> transactionalRegions,
+                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+    SortedSet<byte[]> emptyRegions =
+      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+    // for these empty regions as inactiveTransactionBound
+    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+    for (byte[] emptyRegion : emptyRegions) {
+      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+      }
+    }
+    return Collections.unmodifiableMap(pubWithEmptyRegions);
+  }
+
   private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 5a86b4a..beed1ad 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
   private final TableName tableName;
   private final DataJanitorState dataJanitorState;
   private final long pruneFlushInterval;
+  // Map of region name -> prune upper bound
   private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+  // Map of region name -> time the region was found to be empty
+  private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
 
   private long lastChecked;
 
+  @SuppressWarnings("WeakerAccess")
   public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
     this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
     this.pruneFlushInterval = pruneFlushInterval;
     this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+    this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
   }
 
+  @SuppressWarnings("WeakerAccess")
   public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    warnIfNotRunning(regionName);
     // The number of entries in this map is bound by the number of regions in this region server and thus it will not
     // grow indefinitely
     pruneEntries.put(regionName, pruneUpperBound);
   }
 
+  @SuppressWarnings("WeakerAccess")
+  public void persistRegionEmpty(byte[] regionName, long time) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    emptyRegions.put(regionName, time);
+  }
+
+  @SuppressWarnings("WeakerAccess")
   public boolean isAlive() {
     return flushThread != null && flushThread.isAlive();
   }
@@ -86,13 +102,22 @@
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              while (pruneEntries.firstEntry() != null) {
+              // Record prune upper bound
+              while (!pruneEntries.isEmpty()) {
                 Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
                 dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
                 // We can now remove the entry only if the key and value match with what we wrote since it is
                 // possible that a new pruneUpperBound for the same key has been added
                 pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+              // Record empty regions
+              while (!emptyRegions.isEmpty()) {
+                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new value for the same key has been added
+                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+              }
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
@@ -115,4 +140,11 @@
     flushThread.setDaemon(true);
     flushThread.start();
   }
+
+  private void warnIfNotRunning(byte[] regionName) {
+    if (!isRunning() || !isAlive()) {
+      LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+                             Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+    }
+  }
 }
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 402892f..b96d87d 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -156,6 +156,7 @@
     }
 
     // Verify saved regions
+    Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -163,20 +164,39 @@
                         dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
 
+    // Now change the count stored for regions saved at time 0 and 30
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+    }
+    // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
     // Delete regions saved on or before time 30
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
     // Values on or before time 30 should be deleted
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+    // Counts should be deleted for time on or before 30
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+    }
     // Values after time 30 should still exist
     Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+    }
   }
 
   @Test
   public void testSaveInactiveTransactionBoundTime() throws Exception {
     int maxTime = 100;
 
-    // Nothing sould be present in the beginning
+    // Nothing should be present in the beginning
     Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
 
     // Save inactive transaction bounds for various time values
@@ -202,4 +222,59 @@
     Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
     Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
   }
+
+  @Test
+  public void testSaveEmptyRegions() throws Exception {
+    // Nothing should be present in the beginning
+    Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    byte[] region1 = Bytes.toBytes("region1");
+    byte[] region2 = Bytes.toBytes("region2");
+    byte[] region3 = Bytes.toBytes("region3");
+    byte[] region4 = Bytes.toBytes("region4");
+    SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+    // Now record some empty regions
+    dataJanitorState.saveEmptyRegionForTime(100, region1);
+    dataJanitorState.saveEmptyRegionForTime(110, region1);
+    dataJanitorState.saveEmptyRegionForTime(102, region2);
+    dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    Assert.assertEquals(toISet(region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+    Assert.assertEquals(toISet(region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+    // Delete empty regions on or before time 110
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+    // Now only region3 should remain
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    // Delete empty regions on or before time 150
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+    // Now nothing should remain
+    Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+  }
+
+  private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+    ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+    for (byte[] arg : args) {
+      builder.add(arg);
+    }
+    return builder.build();
+  }
 }
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index a431ee3..07746d8 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -72,6 +72,7 @@
 
   private static TableName txDataTable1;
   private static TableName pruneStateTable;
+  private static DataJanitorState dataJanitorState;
 
   // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
   @BeforeClass
@@ -105,6 +106,14 @@
 
     pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                  TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
   }
 
   @AfterClass
@@ -128,7 +137,12 @@
 
   @After
   public void afterTest() throws Exception {
+    // Disable the data table so that prune writer thread gets stopped,
+    // this makes sure that any cached value will not interfere with next test
+    hBaseAdmin.disableTable(txDataTable1);
     deletePruneStateTable();
+    // Enabling the table enables the prune writer thread again
+    hBaseAdmin.enableTable(txDataTable1);
   }
 
   private void deletePruneStateTable() throws Exception {
@@ -138,32 +152,8 @@
     }
   }
 
-  private void truncatePruneStateTable() throws Exception {
-    if (hBaseAdmin.tableExists(pruneStateTable)) {
-      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
-        hBaseAdmin.disableTable(pruneStateTable);
-      }
-      hBaseAdmin.truncateTable(pruneStateTable, true);
-    }
-  }
-
   @Test
   public void testRecordCompactionState() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public Table get() throws IOException {
-          return testUtil.getConnection().getTable(pruneStateTable);
-        }
-      });
-
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -212,10 +202,6 @@
 
   @Test
   public void testRecordCompactionStateNoTable() throws Exception {
-    // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
-    // and make sure a major compaction succeeds
-    deletePruneStateTable();
-
     // Create a new transaction snapshot
     InMemoryTransactionStateCache.setTransactionSnapshot(
       new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -247,24 +233,9 @@
 
   @Test
   public void testPruneUpperBound() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public Table get() throws IOException {
-          return testUtil.getConnection().getTable(pruneStateTable);
-        }
-      });
-
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
 
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -334,6 +305,87 @@
     }
   }
 
+  @Test
+  public void testPruneEmptyTable() throws Exception {
+    // Make sure that empty tables do not block the progress of pruning
+
+    // Create an empty table
+    TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+    HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+                                     Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+    TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+
+    try {
+      long now1 = System.currentTimeMillis();
+      long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+      long noPruneUpperBound = -1;
+      long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+                                ImmutableSet.of(expectedPruneUpperBound1),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.compact(txEmptyTable, true);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+      long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+      // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+      hBaseAdmin.flush(txEmptyTable);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, again, this time it should work
+      pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Now add some data to the empty table
+      // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+      emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
+      emptyHTable.close();
+
+      // Now run another compaction on txDataTable1 with an updated tx snapshot
+      long now2 = System.currentTimeMillis();
+      long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+      // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+      // empty in the previous run with inactiveTxTimeNow1
+      long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+      // However, after compacting txEmptyTable we should get the latest upper bound
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txEmptyTable, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+      pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+    } finally {
+      transactionPruningPlugin.destroy();
+      hBaseAdmin.disableTable(txEmptyTable);
+      hBaseAdmin.deleteTable(txEmptyTable);
+    }
+    }
+
   private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
     HRegionLocation regionLocation =
       testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 015077b..5e1b4c5 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -45,6 +45,7 @@
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -310,6 +311,28 @@
   }
 
   @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+    // Record whether the region is empty after a flush
+    Region region = e.getEnvironment().getRegion();
+    // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+    // then the region must be empty
+    long numStoreFiles = numStoreFilesForRegion(e);
+    long memstoreSize = region.getMemstoreSize();
+    LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+                            region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+    if (memstoreSize == 0 && numStoreFiles == 0) {
+      if (pruneEnable == null) {
+        initPruneState(e);
+      }
+
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        compactionState.persistRegionEmpty(System.currentTimeMillis());
+      }
+    }
+
+  }
+
+  @Override
   public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
       CompactionRequest request)
@@ -318,25 +341,7 @@
     TransactionVisibilityState snapshot = cache.getLatestState();
 
     if (pruneEnable == null) {
-      Configuration conf = getConfiguration(c.getEnvironment());
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        if (Boolean.TRUE.equals(pruneEnable)) {
-          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                        + pruneTable);
-          }
-        }
-      }
+      initPruneState(c);
     }
 
     if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
+  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+    Configuration conf = getConfiguration(c.getEnvironment());
+    // Configuration won't be null in TransactionProcessor but the derived classes might return
+    // null if it is not available temporarily
+    if (conf != null) {
+      pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                    TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
+      }
+    }
+  }
+
+  private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+    long numStoreFiles = 0;
+    for (Store store : c.getEnvironment().getRegion().getStores()) {
+      numStoreFiles += store.getStorefiles().size();
+    }
+    return numStoreFiles;
+  }
+
   /**
    * Filter used to include cells visible to in-progress transactions on flush and commit.
    */
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index db7880b..9b856d9 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
   }
 
   /**
+   * Persist that the given region is empty at the given time
+   * @param time time in milliseconds
+   */
+  public void persistRegionEmpty(long time) {
+    pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+    }
+  }
+
+  /**
    * Releases the usage {@link PruneUpperBoundWriter}.
    */
   public void stop() {
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 897e00e..fc0ec76 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,11 +54,14 @@
  */
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
+  private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
   public static final byte[] FAMILY = {'f'};
   public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+  private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
 
   private static final byte[] REGION_KEY_PREFIX = {0x1};
   private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
 
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
   private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+  // This value can be used when we don't care about the value we write in a column
+  private static final byte[] COL_VAL = Bytes.toBytes('1');
 
   private final TableSupplier stateTableSupplier;
 
@@ -148,7 +163,7 @@
     for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
       resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
     }
-    return resultMap;
+    return Collections.unmodifiableMap(resultMap);
   }
 
   /**
@@ -181,7 +196,7 @@
         }
       }
     }
-    return regionPruneInfos;
+    return Collections.unmodifiableList(regionPruneInfos);
   }
 
   /**
@@ -223,7 +238,7 @@
   // ---------------------------------------------------
   // ------- Methods for regions at a given time -------
   // ---------------------------------------------------
-  // Key: 0x2<time><region-id>
+  // Key: 0x2<inverted time><region-id>
   // Col 't': <empty byte array>
   // ---------------------------------------------------
 
@@ -240,12 +255,22 @@
     try (Table stateTable = stateTableSupplier.get()) {
       for (byte[] region : regions) {
         Put put = new Put(makeTimeRegionKey(timeBytes, region));
-        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+        put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL);
         stateTable.put(put);
       }
+
+      // Save the count of regions as a checksum
+      saveRegionCountForTime(stateTable, timeBytes, regions.size());
     }
   }
 
+  @VisibleForTesting
+  void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException {
+    Put put = new Put(makeTimeRegionCountKey(timeBytes));
+    put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+    stateTable.put(put);
+  }
+
   /**
    * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
    * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
    */
   @Nullable
   public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
-    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (Table stateTable = stateTableSupplier.get()) {
-      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, REGION_TIME_COL);
-
-      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-      long currentRegionTime = -1;
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
-          // Stop if reached next time value
-          if (currentRegionTime == -1) {
-            currentRegionTime = timeRegion.getKey();
-          } else if (timeRegion.getKey() < currentRegionTime) {
-            break;
-          } else if (timeRegion.getKey() > currentRegionTime) {
-            throw new IllegalStateException(
-              String.format("Got out of order time %d when expecting time less than or equal to %d",
-                            timeRegion.getKey(), currentRegionTime));
-          }
-          regions.add(timeRegion.getValue());
+      TimeRegions timeRegions;
+      while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+        int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+        if (count != -1 && count == timeRegions.getRegions().size()) {
+          return timeRegions;
+        } else {
+          LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+                                 timeRegions.getTime(), count, timeRegions.getRegions().size()));
+          time = time - 1;
         }
       }
-      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+      return null;
     }
   }
 
+  @Nullable
+  private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+    scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+    long currentRegionTime = -1;
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    Result next;
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      while ((next = scanner.next()) != null) {
+        Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+        // Stop if reached next time value
+        if (currentRegionTime == -1) {
+          currentRegionTime = timeRegion.getKey();
+        } else if (timeRegion.getKey() < currentRegionTime) {
+          break;
+        } else if (timeRegion.getKey() > currentRegionTime) {
+          throw new IllegalStateException(
+            String.format("Got out of order time %d when expecting time less than or equal to %d",
+                          timeRegion.getKey(), currentRegionTime));
+        }
+        regions.add(timeRegion.getValue());
+      }
+    }
+    return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+  }
+
+  @VisibleForTesting
+  int getRegionCountForTime(Table stateTable, long time) throws IOException {
+    Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+    get.addColumn(FAMILY, REGION_TIME_COL);
+    Result result = stateTable.get(get);
+    byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+    return value == null ? -1 : Bytes.toInt(value);
+  }
+
   /**
    * Delete all the regions that were recorded for all times equal or less than the given time.
    *
@@ -294,15 +345,15 @@
   public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
     byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
     try (Table stateTable = stateTableSupplier.get()) {
+      // Delete the regions
       Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
 
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
-        }
-      }
+      // Delete the count
+      scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
     }
   }
 
@@ -356,14 +407,82 @@
       Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
                            INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
       scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  // --------------------------------------------------------
+  // ------- Methods for empty regions at a given time -------
+  // --------------------------------------------------------
+  // Key: 0x4<time><region-id>
+  // Col 'e': <empty byte array>
+  // --------------------------------------------------------
+
+  /**
+   * Save the given region as empty as of the given time.
+   *
+   * @param time time in milliseconds
+   * @param regionId region id
+   */
+  public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(time);
+    try (Table stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+      put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Return regions that were recorded as empty after the given time.
+   *
+   * @param time time in milliseconds
+   * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+   *                       and the empty regions after the given time
+   */
+  public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+    throws IOException {
+    SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+                           EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
 
       try (ResultScanner scanner = stateTable.getScanner(scan)) {
         Result next;
         while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
+          byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+          if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+            emptyRegions.add(emptyRegion);
+          }
         }
       }
     }
+    return Collections.unmodifiableSortedSet(emptyRegions);
+  }
+
+  /**
+   * Delete empty region records saved on or before the given time.
+   *
+   * @param time time in milliseconds
+   */
+  public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan();
+      scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  @VisibleForTesting
+  void deleteFromScan(Table stateTable, Scan scan) throws IOException {
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      Result next;
+      while ((next = scanner.next()) != null) {
+        stateTable.delete(new Delete(next.getRow()));
+      }
+    }
   }
 
   private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
     return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
   }
 
+  private byte[] makeTimeRegionCountKey(byte[] time) {
+    return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+  }
+
   private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
     return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
   }
@@ -391,6 +514,15 @@
     return Maps.immutableEntry(time, regionName);
   }
 
+  private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+    return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+  }
+
+  private byte[] getEmptyRegionFromKey(byte[] key) {
+    int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+    return Bytes.copy(key, prefixLen, key.length - prefixLen);
+  }
+
   private long getInvertedTime(long time) {
     return Long.MAX_VALUE - time;
   }
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 99c514f..84c480a 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 /**
@@ -121,7 +122,7 @@
   public void initialize(Configuration conf) throws IOException {
     this.conf = conf;
     this.connection = ConnectionFactory.createConnection(conf);
-    
+
     final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                             TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
@@ -200,6 +201,8 @@
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
     LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
     dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
   }
 
   @Override
@@ -288,26 +291,40 @@
       SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
       long time = timeRegions.getTime();
 
-      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+      if (inactiveTransactionBound == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+                      "and hence the data must be incomplete", time);
+        }
+        continue;
+      }
+
+      // Get the prune upper bounds for all the transactional regions
+      Map<byte[], Long> pruneUpperBoundRegions =
+        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
       logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+      // for transactions started on or before inactiveTransactionBoundTime
+      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+                                                  pruneUpperBoundRegions);
+
       // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
       // across all regions
-      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
-        long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
-        LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
-        // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
-        if (inactiveTransactionBound != -1) {
-          Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
-          return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
-                        "and hence the data must be incomplete", time);
-          }
-        }
+      if (!transactionalRegions.isEmpty() &&
+        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+        return pruneUpperBound;
       } else {
         if (LOG.isDebugEnabled()) {
-          Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+          Sets.SetView<byte[]> difference =
+            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
           LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
                     time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
         }
@@ -318,6 +335,28 @@
     return -1;
   }
 
+  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+                                               SortedSet<byte[]> transactionalRegions,
+                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+    SortedSet<byte[]> emptyRegions =
+      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+    // for these empty regions as inactiveTransactionBound
+    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+    for (byte[] emptyRegion : emptyRegions) {
+      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+      }
+    }
+    return Collections.unmodifiableMap(pubWithEmptyRegions);
+  }
+
   private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 7e9d1a3..9773a15 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
   private final TableName tableName;
   private final DataJanitorState dataJanitorState;
   private final long pruneFlushInterval;
+  // Map of region name -> prune upper bound
   private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+  // Map of region name -> time the region was found to be empty
+  private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
 
   private long lastChecked;
 
+  @SuppressWarnings("WeakerAccess")
   public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
     this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
     this.pruneFlushInterval = pruneFlushInterval;
     this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+    this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
   }
 
+  @SuppressWarnings("WeakerAccess")
   public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    warnIfNotRunning(regionName);
     // The number of entries in this map is bound by the number of regions in this region server and thus it will not
     // grow indefinitely
     pruneEntries.put(regionName, pruneUpperBound);
   }
 
+  @SuppressWarnings("WeakerAccess")
+  public void persistRegionEmpty(byte[] regionName, long time) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    emptyRegions.put(regionName, time);
+  }
+
+  @SuppressWarnings("WeakerAccess")
   public boolean isAlive() {
     return flushThread != null && flushThread.isAlive();
   }
@@ -86,13 +102,22 @@
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              while (pruneEntries.firstEntry() != null) {
+              // Record prune upper bound
+              while (!pruneEntries.isEmpty()) {
                 Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
                 dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
                 // We can now remove the entry only if the key and value match with what we wrote since it is
                 // possible that a new pruneUpperBound for the same key has been added
                 pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+              // Record empty regions
+              while (!emptyRegions.isEmpty()) {
+                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new value for the same key has been added
+                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+              }
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNameWithNamespaceInclAsString(), ex);
@@ -115,4 +140,11 @@
     flushThread.setDaemon(true);
     flushThread.start();
   }
+
+  private void warnIfNotRunning(byte[] regionName) {
+    if (!isRunning() || !isAlive()) {
+      LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+                             Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+    }
+  }
 }
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 402892f..b96d87d 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -156,6 +156,7 @@
     }
 
     // Verify saved regions
+    Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
     Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -163,20 +164,39 @@
                         dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
 
+    // Now change the count stored for regions saved at time 0 and 30
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+      dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+    }
+    // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
     // Delete regions saved on or before time 30
     dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
     // Values on or before time 30 should be deleted
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
     Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+    // Counts should be deleted for time on or before 30
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+      Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+    }
     // Values after time 30 should still exist
     Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+    try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+      Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+    }
   }
 
   @Test
   public void testSaveInactiveTransactionBoundTime() throws Exception {
     int maxTime = 100;
 
-    // Nothing sould be present in the beginning
+    // Nothing should be present in the beginning
     Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
 
     // Save inactive transaction bounds for various time values
@@ -202,4 +222,59 @@
     Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
     Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
   }
+
+  @Test
+  public void testSaveEmptyRegions() throws Exception {
+    // Nothing should be present in the beginning
+    Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    byte[] region1 = Bytes.toBytes("region1");
+    byte[] region2 = Bytes.toBytes("region2");
+    byte[] region3 = Bytes.toBytes("region3");
+    byte[] region4 = Bytes.toBytes("region4");
+    SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+    // Now record some empty regions
+    dataJanitorState.saveEmptyRegionForTime(100, region1);
+    dataJanitorState.saveEmptyRegionForTime(110, region1);
+    dataJanitorState.saveEmptyRegionForTime(102, region2);
+    dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+    Assert.assertEquals(toISet(region1, region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    Assert.assertEquals(toISet(region2, region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+    Assert.assertEquals(toISet(region3),
+                        dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+    Assert.assertEquals(toISet(),
+                        dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+    // Delete empty regions on or before time 110
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+    // Now only region3 should remain
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+    Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+    // Delete empty regions on or before time 150
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+    // Now nothing should remain
+    Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+  }
+
+  private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+    ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+    for (byte[] arg : args) {
+      builder.add(arg);
+    }
+    return builder.build();
+  }
 }
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index a431ee3..07746d8 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -72,6 +72,7 @@
 
   private static TableName txDataTable1;
   private static TableName pruneStateTable;
+  private static DataJanitorState dataJanitorState;
 
   // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
   @BeforeClass
@@ -105,6 +106,14 @@
 
     pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                  TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
   }
 
   @AfterClass
@@ -128,7 +137,12 @@
 
   @After
   public void afterTest() throws Exception {
+    // Disable the data table so that prune writer thread gets stopped,
+    // this makes sure that any cached value will not interfere with next test
+    hBaseAdmin.disableTable(txDataTable1);
     deletePruneStateTable();
+    // Enabling the table enables the prune writer thread again
+    hBaseAdmin.enableTable(txDataTable1);
   }
 
   private void deletePruneStateTable() throws Exception {
@@ -138,32 +152,8 @@
     }
   }
 
-  private void truncatePruneStateTable() throws Exception {
-    if (hBaseAdmin.tableExists(pruneStateTable)) {
-      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
-        hBaseAdmin.disableTable(pruneStateTable);
-      }
-      hBaseAdmin.truncateTable(pruneStateTable, true);
-    }
-  }
-
   @Test
   public void testRecordCompactionState() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public Table get() throws IOException {
-          return testUtil.getConnection().getTable(pruneStateTable);
-        }
-      });
-
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -212,10 +202,6 @@
 
   @Test
   public void testRecordCompactionStateNoTable() throws Exception {
-    // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
-    // and make sure a major compaction succeeds
-    deletePruneStateTable();
-
     // Create a new transaction snapshot
     InMemoryTransactionStateCache.setTransactionSnapshot(
       new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -247,24 +233,9 @@
 
   @Test
   public void testPruneUpperBound() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public Table get() throws IOException {
-          return testUtil.getConnection().getTable(pruneStateTable);
-        }
-      });
-
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
 
-    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
-    TimeUnit.SECONDS.sleep(2);
-    // Truncate prune state table to clear any data that might have been written by the previous test
-    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
-    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
-    truncatePruneStateTable();
-
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -334,6 +305,87 @@
     }
   }
 
+  @Test
+  public void testPruneEmptyTable() throws Exception {
+    // Make sure that empty tables do not block the progress of pruning
+
+    // Create an empty table
+    TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+    HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+                                     Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+    TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+
+    try {
+      long now1 = System.currentTimeMillis();
+      long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+      long noPruneUpperBound = -1;
+      long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+                                ImmutableSet.of(expectedPruneUpperBound1),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.compact(txEmptyTable, true);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+      long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+      // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+      hBaseAdmin.flush(txEmptyTable);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, again, this time it should work
+      pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Now add some data to the empty table
+      // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+      emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
+      emptyHTable.close();
+
+      // Now run another compaction on txDataTable1 with an updated tx snapshot
+      long now2 = System.currentTimeMillis();
+      long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+      // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+      // empty in the previous run with inactiveTxTimeNow1
+      long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+      // However, after compacting txEmptyTable we should get the latest upper bound
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txEmptyTable, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+      pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+    } finally {
+      transactionPruningPlugin.destroy();
+      hBaseAdmin.disableTable(txEmptyTable);
+      hBaseAdmin.deleteTable(txEmptyTable);
+    }
+    }
+
   private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
     HRegionLocation regionLocation =
       testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);