TEPHRA-216 Handle empty transactional regions during inactive list pruning
This closes #34
Signed-off-by: poorna <poorna@apache.org>
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
index 8ea5a11..d73c50a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
@@ -93,7 +93,8 @@
}
}
if (toTruncate.isEmpty()) {
- LOG.info("Not pruning invalid list since no invalid id is less than or equal to the minimum prune upper bound");
+ LOG.info("Not pruning invalid list since the min prune upper bound {} is greater than the min invalid id {}",
+ minPruneUpperBound, invalids[0]);
return;
}
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
index 52d7279..d80bbd4 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
@@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
@@ -68,7 +69,11 @@
}
LOG.info("Starting {}...", this.getClass().getSimpleName());
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("tephra-pruning-thread")
+ .setDaemon(true)
+ .build());
Map<String, TransactionPruningPlugin> plugins = initializePlugins();
long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 53a8957..9ff4d3b 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -310,6 +311,28 @@
}
@Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+ // Record whether the region is empty after a flush
+ HRegion region = e.getEnvironment().getRegion();
+ // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+ // then the region must be empty
+ long numStoreFiles = numStoreFilesForRegion(e);
+ long memstoreSize = region.getMemstoreSize().get();
+ LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+ region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+ if (memstoreSize == 0 && numStoreFiles == 0) {
+ if (pruneEnable == null) {
+ initPruneState(e);
+ }
+
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ compactionState.persistRegionEmpty(System.currentTimeMillis());
+ }
+ }
+
+ }
+
+ @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
@@ -318,25 +341,7 @@
TransactionVisibilityState snapshot = cache.getLatestState();
if (pruneEnable == null) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
- }
- }
- }
+ initPruneState(c);
}
if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
+ private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+ Configuration conf = getConfiguration(c.getEnvironment());
+ // Configuration won't be null in TransactionProcessor but the derived classes might return
+ // null if it is not available temporarily
+ if (conf != null) {
+ pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
+ }
+ }
+ }
+
+ private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+ long numStoreFiles = 0;
+ for (Store store : c.getEnvironment().getRegion().getStores().values()) {
+ numStoreFiles += store.getStorefiles().size();
+ }
+ return numStoreFiles;
+ }
+
/**
* Filter used to include cells visible to in-progress transactions on flush and commit.
*/
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index c1f1825..7060244 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
}
/**
+ * Persist that the given region is empty at the given time
+ * @param time time in milliseconds
+ */
+ public void persistRegionEmpty(long time) {
+ pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+ }
+ }
+
+ /**
* Releases the usage {@link PruneUpperBoundWriter}.
*/
public void stop() {
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 979eb1a..4345fe6 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
package org.apache.tephra.hbase.txprune;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,11 +54,14 @@
*/
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
+ private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
public static final byte[] FAMILY = {'f'};
public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+ private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
private static final byte[] REGION_KEY_PREFIX = {0x1};
private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ // This value can be used when we don't care about the value we write in a column
+ private static final byte[] COL_VAL = Bytes.toBytes('1');
private final TableSupplier stateTableSupplier;
@@ -148,7 +163,7 @@
for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
}
- return resultMap;
+ return Collections.unmodifiableMap(resultMap);
}
/**
@@ -181,7 +196,7 @@
}
}
}
- return regionPruneInfos;
+ return Collections.unmodifiableList(regionPruneInfos);
}
/**
@@ -223,7 +238,7 @@
// ---------------------------------------------------
// ------- Methods for regions at a given time -------
// ---------------------------------------------------
- // Key: 0x2<time><region-id>
+ // Key: 0x2<inverted time><region-id>
// Col 't': <empty byte array>
// ---------------------------------------------------
@@ -240,12 +255,22 @@
try (HTableInterface stateTable = stateTableSupplier.get()) {
for (byte[] region : regions) {
Put put = new Put(makeTimeRegionKey(timeBytes, region));
- put.add(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+ put.add(FAMILY, REGION_TIME_COL, COL_VAL);
stateTable.put(put);
}
+
+ // Save the count of regions as a checksum
+ saveRegionCountForTime(stateTable, timeBytes, regions.size());
}
}
+ @VisibleForTesting
+ void saveRegionCountForTime(HTableInterface stateTable, byte[] timeBytes, int count) throws IOException {
+ Put put = new Put(makeTimeRegionCountKey(timeBytes));
+ put.add(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+ stateTable.put(put);
+ }
+
/**
* Return the set of regions saved for the time at or before the given time. This method finds the greatest time
* that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
*/
@Nullable
public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
- byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (HTableInterface stateTable = stateTableSupplier.get()) {
- Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
- scan.addColumn(FAMILY, REGION_TIME_COL);
-
- SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- long currentRegionTime = -1;
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
- // Stop if reached next time value
- if (currentRegionTime == -1) {
- currentRegionTime = timeRegion.getKey();
- } else if (timeRegion.getKey() < currentRegionTime) {
- break;
- } else if (timeRegion.getKey() > currentRegionTime) {
- throw new IllegalStateException(
- String.format("Got out of order time %d when expecting time less than or equal to %d",
- timeRegion.getKey(), currentRegionTime));
- }
- regions.add(timeRegion.getValue());
+ TimeRegions timeRegions;
+ while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+ int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+ if (count != -1 && count == timeRegions.getRegions().size()) {
+ return timeRegions;
+ } else {
+ LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+ timeRegions.getTime(), count, timeRegions.getRegions().size()));
+ time = time - 1;
}
}
- return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+ return null;
}
}
+ @Nullable
+ private TimeRegions getNextSetOfTimeRegions(HTableInterface stateTable, long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+ long currentRegionTime = -1;
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Result next;
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ while ((next = scanner.next()) != null) {
+ Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+ // Stop if reached next time value
+ if (currentRegionTime == -1) {
+ currentRegionTime = timeRegion.getKey();
+ } else if (timeRegion.getKey() < currentRegionTime) {
+ break;
+ } else if (timeRegion.getKey() > currentRegionTime) {
+ throw new IllegalStateException(
+ String.format("Got out of order time %d when expecting time less than or equal to %d",
+ timeRegion.getKey(), currentRegionTime));
+ }
+ regions.add(timeRegion.getValue());
+ }
+ }
+ return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+ }
+
+ @VisibleForTesting
+ int getRegionCountForTime(HTableInterface stateTable, long time) throws IOException {
+ Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+ get.addColumn(FAMILY, REGION_TIME_COL);
+ Result result = stateTable.get(get);
+ byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+ return value == null ? -1 : Bytes.toInt(value);
+ }
+
/**
* Delete all the regions that were recorded for all times equal or less than the given time.
*
@@ -294,15 +345,15 @@
public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (HTableInterface stateTable = stateTableSupplier.get()) {
+ // Delete the regions
Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
- }
- }
+ // Delete the count
+ scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
}
}
@@ -356,14 +407,82 @@
Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ // --------------------------------------------------------
+ // ------- Methods for empty regions at a given time -------
+ // --------------------------------------------------------
+ // Key: 0x4<time><region-id>
+ // Col 'e': <empty byte array>
+ // --------------------------------------------------------
+
+ /**
+ * Save the given region as empty as of the given time.
+ *
+ * @param time time in milliseconds
+ * @param regionId region id
+ */
+ public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(time);
+ try (HTableInterface stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+ put.add(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Return regions that were recorded as empty after the given time.
+ *
+ * @param time time in milliseconds
+ * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+ * and the empty regions after the given time
+ */
+ public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+ throws IOException {
+ SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (HTableInterface stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+ EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
try (ResultScanner scanner = stateTable.getScanner(scan)) {
Result next;
while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
+ byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+ if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+ emptyRegions.add(emptyRegion);
+ }
}
}
}
+ return Collections.unmodifiableSortedSet(emptyRegions);
+ }
+
+ /**
+ * Delete empty region records saved on or before the given time.
+ *
+ * @param time time in milliseconds
+ */
+ public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+ try (HTableInterface stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan();
+ scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ @VisibleForTesting
+ void deleteFromScan(HTableInterface stateTable, Scan scan) throws IOException {
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
}
private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
}
+ private byte[] makeTimeRegionCountKey(byte[] time) {
+ return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+ }
+
private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
}
@@ -391,6 +514,15 @@
return Maps.immutableEntry(time, regionName);
}
+ private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+ return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+ }
+
+ private byte[] getEmptyRegionFromKey(byte[] key) {
+ int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+ return Bytes.copy(key, prefixLen, key.length - prefixLen);
+ }
+
private long getInvertedTime(long time) {
return Long.MAX_VALUE - time;
}
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 80da8d8..021f1b2 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
/**
@@ -203,6 +204,8 @@
dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
}
@Override
@@ -295,26 +298,40 @@
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
long time = timeRegions.getTime();
- Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+ long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+ LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+ // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+ if (inactiveTransactionBound == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+ "and hence the data must be incomplete", time);
+ }
+ continue;
+ }
+
+ // Get the prune upper bounds for all the transactional regions
+ Map<byte[], Long> pruneUpperBoundRegions =
+ dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+ // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+ // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started on or before inactiveTransactionBoundTime
+ pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+ pruneUpperBoundRegions);
+
// If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
// across all regions
- if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
- long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
- LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
- // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
- if (inactiveTransactionBound != -1) {
- Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
- return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
- "and hence the data must be incomplete", time);
- }
- }
+ if (!transactionalRegions.isEmpty() &&
+ pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+ Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+ long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+ LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+ return pruneUpperBound;
} else {
if (LOG.isDebugEnabled()) {
- Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+ Sets.SetView<byte[]> difference =
+ Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
}
@@ -325,6 +342,28 @@
return -1;
}
+ private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+ SortedSet<byte[]> transactionalRegions,
+ Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+ long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+ SortedSet<byte[]> emptyRegions =
+ dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+ LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+ inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+ // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+ // for these empty regions as inactiveTransactionBound
+ Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+ for (byte[] emptyRegion : emptyRegions) {
+ if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+ pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+ }
+ }
+ return Collections.unmodifiableMap(pubWithEmptyRegions);
+ }
+
private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 5a86b4a..beed1ad 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
private final TableName tableName;
private final DataJanitorState dataJanitorState;
private final long pruneFlushInterval;
+ // Map of region name -> prune upper bound
private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+ // Map of region name -> time the region was found to be empty
+ private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
private long lastChecked;
+ @SuppressWarnings("WeakerAccess")
public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
this.tableName = tableName;
this.dataJanitorState = dataJanitorState;
this.pruneFlushInterval = pruneFlushInterval;
this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
}
+ @SuppressWarnings("WeakerAccess")
public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+ warnIfNotRunning(regionName);
// The number of entries in this map is bound by the number of regions in this region server and thus it will not
// grow indefinitely
pruneEntries.put(regionName, pruneUpperBound);
}
+ @SuppressWarnings("WeakerAccess")
+ public void persistRegionEmpty(byte[] regionName, long time) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ emptyRegions.put(regionName, time);
+ }
+
+ @SuppressWarnings("WeakerAccess")
public boolean isAlive() {
return flushThread != null && flushThread.isAlive();
}
@@ -86,13 +102,22 @@
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- while (pruneEntries.firstEntry() != null) {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new pruneUpperBound for the same key has been added
pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
}
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
@@ -115,4 +140,11 @@
flushThread.setDaemon(true);
flushThread.start();
}
+
+ private void warnIfNotRunning(byte[] regionName) {
+ if (!isRunning() || !isAlive()) {
+ LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+ Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+ }
+ }
}
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 3ae0423..14bf96c 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -161,6 +161,7 @@
}
// Verify saved regions
+ Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -168,20 +169,39 @@
dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+ // Now change the count stored for regions saved at time 0 and 30
+ try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+ }
+ // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
// Delete regions saved on or before time 30
dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
// Values on or before time 30 should be deleted
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+ // Counts should be deleted for time on or before 30
+ try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+ }
// Values after time 30 should still exist
Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+ try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+ Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+ }
}
@Test
public void testSaveInactiveTransactionBoundTime() throws Exception {
int maxTime = 100;
- // Nothing sould be present in the beginning
+ // Nothing should be present in the beginning
Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
// Save inactive transaction bounds for various time values
@@ -207,4 +227,59 @@
Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
}
+
+ @Test
+ public void testSaveEmptyRegions() throws Exception {
+ // Nothing should be present in the beginning
+ Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ byte[] region1 = Bytes.toBytes("region1");
+ byte[] region2 = Bytes.toBytes("region2");
+ byte[] region3 = Bytes.toBytes("region3");
+ byte[] region4 = Bytes.toBytes("region4");
+ SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+ // Now record some empty regions
+ dataJanitorState.saveEmptyRegionForTime(100, region1);
+ dataJanitorState.saveEmptyRegionForTime(110, region1);
+ dataJanitorState.saveEmptyRegionForTime(102, region2);
+ dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ Assert.assertEquals(toISet(region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+ Assert.assertEquals(toISet(region3),
+ dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+ // Delete empty regions on or before time 110
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+ // Now only region3 should remain
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ // Delete empty regions on or before time 150
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+ // Now nothing should remain
+ Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ }
+
+ private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+ ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+ for (byte[] arg : args) {
+ builder.add(arg);
+ }
+ return builder.build();
+ }
}
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index fbd4d7d..e3f5c6b 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -74,8 +74,9 @@
private static TableName txDataTable1;
private static TableName pruneStateTable;
+ private static DataJanitorState dataJanitorState;
- private HConnection connection;
+ private static HConnection connection;
// Override AbstractHBaseTableTest.startMiniCluster to setup configuration
@BeforeClass
@@ -109,17 +110,25 @@
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ connection = HConnectionManager.createConnection(conf);
+ dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public HTableInterface get() throws IOException {
+ return connection.getTable(pruneStateTable);
+ }
+ });
}
@AfterClass
public static void shutdownAfterClass() throws Exception {
+ connection.close();
hBaseAdmin.disableTable(txDataTable1);
hBaseAdmin.deleteTable(txDataTable1);
}
@Before
public void beforeTest() throws Exception {
- connection = HConnectionManager.createConnection(conf);
createPruneStateTable();
InMemoryTransactionStateCache.setTransactionSnapshot(null);
}
@@ -133,8 +142,12 @@
@After
public void afterTest() throws Exception {
+ // Disable the data table so that prune writer thread gets stopped,
+ // this makes sure that any cached value will not interfere with next test
+ hBaseAdmin.disableTable(txDataTable1);
deletePruneStateTable();
- connection.close();
+ // Enabling the table enables the prune writer thread again
+ hBaseAdmin.enableTable(txDataTable1);
}
private void deletePruneStateTable() throws Exception {
@@ -144,34 +157,8 @@
}
}
- private void truncatePruneStateTable() throws Exception {
- if (hBaseAdmin.tableExists(pruneStateTable)) {
- if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
- hBaseAdmin.disableTable(pruneStateTable);
- }
- HTableDescriptor htd = hBaseAdmin.getTableDescriptor(pruneStateTable);
- hBaseAdmin.deleteTable(pruneStateTable);
- hBaseAdmin.createTable(htd);
- }
- }
-
@Test
public void testRecordCompactionState() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public HTableInterface get() throws IOException {
- return connection.getTable(pruneStateTable);
- }
- });
-
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
// No prune upper bound initially
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -220,10 +207,6 @@
@Test
public void testRecordCompactionStateNoTable() throws Exception {
- // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
- // and make sure a major compaction succeeds
- deletePruneStateTable();
-
// Create a new transaction snapshot
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -255,23 +238,9 @@
@Test
public void testPruneUpperBound() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public HTableInterface get() throws IOException {
- return connection.getTable(pruneStateTable);
- }
- });
-
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
+
try {
// Run without a transaction snapshot first
long now1 = 200;
@@ -341,6 +310,87 @@
}
}
+ @Test
+ public void testPruneEmptyTable() throws Exception {
+ // Make sure that empty tables do not block the progress of pruning
+
+ // Create an empty table
+ TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+ HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.compact(txEmptyTable, true);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+ testUtil.flush(txEmptyTable);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, again, this time it should work
+ pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+ // Now add some data to the empty table
+ // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+ emptyHTable.put(new Put(Bytes.toBytes(1)).add(family, qualifier, Bytes.toBytes(1)));
+ emptyHTable.close();
+
+ // Now run another compaction on txDataTable1 with an updated tx snapshot
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+ ImmutableSet.of(expectedPruneUpperBound2),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+ // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+ // empty in the previous run with inactiveTxTimeNow1
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+ // However, after compacting txEmptyTable we should get the latest upper bound
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txEmptyTable, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+ } finally {
+ transactionPruningPlugin.destroy();
+ hBaseAdmin.disableTable(txEmptyTable);
+ hBaseAdmin.deleteTable(txEmptyTable);
+ }
+ }
+
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation = connection.getRegionLocation(dataTable, row, true);
return regionLocation.getRegionInfo().getRegionName();
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9e9dd46..7485b91 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -310,6 +311,28 @@
}
@Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+ // Record whether the region is empty after a flush
+ HRegion region = e.getEnvironment().getRegion();
+ // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+ // then the region must be empty
+ long numStoreFiles = numStoreFilesForRegion(e);
+ long memstoreSize = region.getMemstoreSize().get();
+ LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+ region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+ if (memstoreSize == 0 && numStoreFiles == 0) {
+ if (pruneEnable == null) {
+ initPruneState(e);
+ }
+
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ compactionState.persistRegionEmpty(System.currentTimeMillis());
+ }
+ }
+
+ }
+
+ @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
@@ -318,25 +341,7 @@
TransactionVisibilityState snapshot = cache.getLatestState();
if (pruneEnable == null) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
- }
- }
- }
+ initPruneState(c);
}
if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
+ private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+ Configuration conf = getConfiguration(c.getEnvironment());
+ // Configuration won't be null in TransactionProcessor but the derived classes might return
+ // null if it is not available temporarily
+ if (conf != null) {
+ pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
+ }
+ }
+ }
+
+ private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+ long numStoreFiles = 0;
+ for (Store store : c.getEnvironment().getRegion().getStores().values()) {
+ numStoreFiles += store.getStorefiles().size();
+ }
+ return numStoreFiles;
+ }
+
/**
* Filter used to include cells visible to in-progress transactions on flush and commit.
*/
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index c1f1825..7060244 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
}
/**
+ * Persist that the given region is empty at the given time
+ * @param time time in milliseconds
+ */
+ public void persistRegionEmpty(long time) {
+ pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+ }
+ }
+
+ /**
* Releases the usage {@link PruneUpperBoundWriter}.
*/
public void stop() {
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 979eb1a..4345fe6 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
package org.apache.tephra.hbase.txprune;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,11 +54,14 @@
*/
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
+ private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
public static final byte[] FAMILY = {'f'};
public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+ private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
private static final byte[] REGION_KEY_PREFIX = {0x1};
private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ // This value can be used when we don't care about the value we write in a column
+ private static final byte[] COL_VAL = Bytes.toBytes('1');
private final TableSupplier stateTableSupplier;
@@ -148,7 +163,7 @@
for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
}
- return resultMap;
+ return Collections.unmodifiableMap(resultMap);
}
/**
@@ -181,7 +196,7 @@
}
}
}
- return regionPruneInfos;
+ return Collections.unmodifiableList(regionPruneInfos);
}
/**
@@ -223,7 +238,7 @@
// ---------------------------------------------------
// ------- Methods for regions at a given time -------
// ---------------------------------------------------
- // Key: 0x2<time><region-id>
+ // Key: 0x2<inverted time><region-id>
// Col 't': <empty byte array>
// ---------------------------------------------------
@@ -240,12 +255,22 @@
try (HTableInterface stateTable = stateTableSupplier.get()) {
for (byte[] region : regions) {
Put put = new Put(makeTimeRegionKey(timeBytes, region));
- put.add(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+ put.add(FAMILY, REGION_TIME_COL, COL_VAL);
stateTable.put(put);
}
+
+ // Save the count of regions as a checksum
+ saveRegionCountForTime(stateTable, timeBytes, regions.size());
}
}
+ @VisibleForTesting
+ void saveRegionCountForTime(HTableInterface stateTable, byte[] timeBytes, int count) throws IOException {
+ Put put = new Put(makeTimeRegionCountKey(timeBytes));
+ put.add(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+ stateTable.put(put);
+ }
+
/**
* Return the set of regions saved for the time at or before the given time. This method finds the greatest time
* that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
*/
@Nullable
public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
- byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (HTableInterface stateTable = stateTableSupplier.get()) {
- Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
- scan.addColumn(FAMILY, REGION_TIME_COL);
-
- SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- long currentRegionTime = -1;
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
- // Stop if reached next time value
- if (currentRegionTime == -1) {
- currentRegionTime = timeRegion.getKey();
- } else if (timeRegion.getKey() < currentRegionTime) {
- break;
- } else if (timeRegion.getKey() > currentRegionTime) {
- throw new IllegalStateException(
- String.format("Got out of order time %d when expecting time less than or equal to %d",
- timeRegion.getKey(), currentRegionTime));
- }
- regions.add(timeRegion.getValue());
+ TimeRegions timeRegions;
+ while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+ int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+ if (count != -1 && count == timeRegions.getRegions().size()) {
+ return timeRegions;
+ } else {
+ LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+ timeRegions.getTime(), count, timeRegions.getRegions().size()));
+ time = time - 1;
}
}
- return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+ return null;
}
}
+ @Nullable
+ private TimeRegions getNextSetOfTimeRegions(HTableInterface stateTable, long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+ long currentRegionTime = -1;
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Result next;
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ while ((next = scanner.next()) != null) {
+ Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+ // Stop if reached next time value
+ if (currentRegionTime == -1) {
+ currentRegionTime = timeRegion.getKey();
+ } else if (timeRegion.getKey() < currentRegionTime) {
+ break;
+ } else if (timeRegion.getKey() > currentRegionTime) {
+ throw new IllegalStateException(
+ String.format("Got out of order time %d when expecting time less than or equal to %d",
+ timeRegion.getKey(), currentRegionTime));
+ }
+ regions.add(timeRegion.getValue());
+ }
+ }
+ return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+ }
+
+ @VisibleForTesting
+ int getRegionCountForTime(HTableInterface stateTable, long time) throws IOException {
+ Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+ get.addColumn(FAMILY, REGION_TIME_COL);
+ Result result = stateTable.get(get);
+ byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+ return value == null ? -1 : Bytes.toInt(value);
+ }
+
/**
* Delete all the regions that were recorded for all times equal or less than the given time.
*
@@ -294,15 +345,15 @@
public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (HTableInterface stateTable = stateTableSupplier.get()) {
+ // Delete the regions
Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
- }
- }
+ // Delete the count
+ scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
}
}
@@ -356,14 +407,82 @@
Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ // --------------------------------------------------------
+ // ------- Methods for empty regions at a given time -------
+ // --------------------------------------------------------
+ // Key: 0x4<time><region-id>
+ // Col 'e': <empty byte array>
+ // --------------------------------------------------------
+
+ /**
+ * Save the given region as empty as of the given time.
+ *
+ * @param time time in milliseconds
+ * @param regionId region id
+ */
+ public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(time);
+ try (HTableInterface stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+ put.add(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Return regions that were recorded as empty after the given time.
+ *
+ * @param time time in milliseconds
+ * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+ * and the empty regions after the given time
+ */
+ public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+ throws IOException {
+ SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (HTableInterface stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+ EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
try (ResultScanner scanner = stateTable.getScanner(scan)) {
Result next;
while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
+ byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+ if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+ emptyRegions.add(emptyRegion);
+ }
}
}
}
+ return Collections.unmodifiableSortedSet(emptyRegions);
+ }
+
+ /**
+ * Delete empty region records saved on or before the given time.
+ *
+ * @param time time in milliseconds
+ */
+ public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+ try (HTableInterface stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan();
+ scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ @VisibleForTesting
+ void deleteFromScan(HTableInterface stateTable, Scan scan) throws IOException {
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
}
private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
}
+ private byte[] makeTimeRegionCountKey(byte[] time) {
+ return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+ }
+
private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
}
@@ -391,6 +514,15 @@
return Maps.immutableEntry(time, regionName);
}
+ private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+ return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+ }
+
+ private byte[] getEmptyRegionFromKey(byte[] key) {
+ int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+ return Bytes.copy(key, prefixLen, key.length - prefixLen);
+ }
+
private long getInvertedTime(long time) {
return Long.MAX_VALUE - time;
}
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 80da8d8..021f1b2 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
/**
@@ -203,6 +204,8 @@
dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
}
@Override
@@ -295,26 +298,40 @@
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
long time = timeRegions.getTime();
- Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+ long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+ LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+ // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+ if (inactiveTransactionBound == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+ "and hence the data must be incomplete", time);
+ }
+ continue;
+ }
+
+ // Get the prune upper bounds for all the transactional regions
+ Map<byte[], Long> pruneUpperBoundRegions =
+ dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+ // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+ // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started on or before inactiveTransactionBoundTime
+ pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+ pruneUpperBoundRegions);
+
// If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
// across all regions
- if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
- long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
- LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
- // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
- if (inactiveTransactionBound != -1) {
- Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
- return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
- "and hence the data must be incomplete", time);
- }
- }
+ if (!transactionalRegions.isEmpty() &&
+ pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+ Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+ long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+ LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+ return pruneUpperBound;
} else {
if (LOG.isDebugEnabled()) {
- Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+ Sets.SetView<byte[]> difference =
+ Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
}
@@ -325,6 +342,28 @@
return -1;
}
+ private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+ SortedSet<byte[]> transactionalRegions,
+ Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+ long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+ SortedSet<byte[]> emptyRegions =
+ dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+ LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+ inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+ // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+ // for these empty regions as inactiveTransactionBound
+ Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+ for (byte[] emptyRegion : emptyRegions) {
+ if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+ pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+ }
+ }
+ return Collections.unmodifiableMap(pubWithEmptyRegions);
+ }
+
private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 5a86b4a..beed1ad 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
private final TableName tableName;
private final DataJanitorState dataJanitorState;
private final long pruneFlushInterval;
+ // Map of region name -> prune upper bound
private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+ // Map of region name -> time the region was found to be empty
+ private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
private long lastChecked;
+ @SuppressWarnings("WeakerAccess")
public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
this.tableName = tableName;
this.dataJanitorState = dataJanitorState;
this.pruneFlushInterval = pruneFlushInterval;
this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
}
+ @SuppressWarnings("WeakerAccess")
public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+ warnIfNotRunning(regionName);
// The number of entries in this map is bound by the number of regions in this region server and thus it will not
// grow indefinitely
pruneEntries.put(regionName, pruneUpperBound);
}
+ @SuppressWarnings("WeakerAccess")
+ public void persistRegionEmpty(byte[] regionName, long time) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ emptyRegions.put(regionName, time);
+ }
+
+ @SuppressWarnings("WeakerAccess")
public boolean isAlive() {
return flushThread != null && flushThread.isAlive();
}
@@ -86,13 +102,22 @@
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- while (pruneEntries.firstEntry() != null) {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new pruneUpperBound for the same key has been added
pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
}
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
@@ -115,4 +140,11 @@
flushThread.setDaemon(true);
flushThread.start();
}
+
+ private void warnIfNotRunning(byte[] regionName) {
+ if (!isRunning() || !isAlive()) {
+ LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+ Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+ }
+ }
}
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 3ae0423..14bf96c 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -161,6 +161,7 @@
}
// Verify saved regions
+ Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -168,20 +169,39 @@
dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+ // Now change the count stored for regions saved at time 0 and 30
+ try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+ }
+ // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
// Delete regions saved on or before time 30
dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
// Values on or before time 30 should be deleted
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+ // Counts should be deleted for time on or before 30
+ try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+ }
// Values after time 30 should still exist
Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+ try (HTableInterface stateTable = connection.getTable(pruneStateTable)) {
+ Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+ }
}
@Test
public void testSaveInactiveTransactionBoundTime() throws Exception {
int maxTime = 100;
- // Nothing sould be present in the beginning
+ // Nothing should be present in the beginning
Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
// Save inactive transaction bounds for various time values
@@ -207,4 +227,59 @@
Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
}
+
+ @Test
+ public void testSaveEmptyRegions() throws Exception {
+ // Nothing should be present in the beginning
+ Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ byte[] region1 = Bytes.toBytes("region1");
+ byte[] region2 = Bytes.toBytes("region2");
+ byte[] region3 = Bytes.toBytes("region3");
+ byte[] region4 = Bytes.toBytes("region4");
+ SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+ // Now record some empty regions
+ dataJanitorState.saveEmptyRegionForTime(100, region1);
+ dataJanitorState.saveEmptyRegionForTime(110, region1);
+ dataJanitorState.saveEmptyRegionForTime(102, region2);
+ dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ Assert.assertEquals(toISet(region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+ Assert.assertEquals(toISet(region3),
+ dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+ // Delete empty regions on or before time 110
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+ // Now only region3 should remain
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ // Delete empty regions on or before time 150
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+ // Now nothing should remain
+ Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ }
+
+ private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+ ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+ for (byte[] arg : args) {
+ builder.add(arg);
+ }
+ return builder.build();
+ }
}
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 37f732c..e3f5c6b 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -74,8 +74,9 @@
private static TableName txDataTable1;
private static TableName pruneStateTable;
+ private static DataJanitorState dataJanitorState;
- private HConnection connection;
+ private static HConnection connection;
// Override AbstractHBaseTableTest.startMiniCluster to setup configuration
@BeforeClass
@@ -109,17 +110,25 @@
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ connection = HConnectionManager.createConnection(conf);
+ dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public HTableInterface get() throws IOException {
+ return connection.getTable(pruneStateTable);
+ }
+ });
}
@AfterClass
public static void shutdownAfterClass() throws Exception {
+ connection.close();
hBaseAdmin.disableTable(txDataTable1);
hBaseAdmin.deleteTable(txDataTable1);
}
@Before
public void beforeTest() throws Exception {
- connection = HConnectionManager.createConnection(conf);
createPruneStateTable();
InMemoryTransactionStateCache.setTransactionSnapshot(null);
}
@@ -133,8 +142,12 @@
@After
public void afterTest() throws Exception {
+ // Disable the data table so that prune writer thread gets stopped,
+ // this makes sure that any cached value will not interfere with next test
+ hBaseAdmin.disableTable(txDataTable1);
deletePruneStateTable();
- connection.close();
+ // Enabling the table enables the prune writer thread again
+ hBaseAdmin.enableTable(txDataTable1);
}
private void deletePruneStateTable() throws Exception {
@@ -144,32 +157,8 @@
}
}
- private void truncatePruneStateTable() throws Exception {
- if (hBaseAdmin.tableExists(pruneStateTable)) {
- if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
- hBaseAdmin.disableTable(pruneStateTable);
- }
- hBaseAdmin.truncateTable(pruneStateTable, true);
- }
- }
-
@Test
public void testRecordCompactionState() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public HTableInterface get() throws IOException {
- return connection.getTable(pruneStateTable);
- }
- });
-
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
// No prune upper bound initially
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -218,10 +207,6 @@
@Test
public void testRecordCompactionStateNoTable() throws Exception {
- // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
- // and make sure a major compaction succeeds
- deletePruneStateTable();
-
// Create a new transaction snapshot
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -253,24 +238,9 @@
@Test
public void testPruneUpperBound() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public HTableInterface get() throws IOException {
- return connection.getTable(pruneStateTable);
- }
- });
-
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
try {
// Run without a transaction snapshot first
long now1 = 200;
@@ -340,6 +310,87 @@
}
}
+ @Test
+ public void testPruneEmptyTable() throws Exception {
+ // Make sure that empty tables do not block the progress of pruning
+
+ // Create an empty table
+ TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+ HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.compact(txEmptyTable, true);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+ testUtil.flush(txEmptyTable);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, again, this time it should work
+ pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+ // Now add some data to the empty table
+ // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+ emptyHTable.put(new Put(Bytes.toBytes(1)).add(family, qualifier, Bytes.toBytes(1)));
+ emptyHTable.close();
+
+ // Now run another compaction on txDataTable1 with an updated tx snapshot
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+ ImmutableSet.of(expectedPruneUpperBound2),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+ // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+ // empty in the previous run with inactiveTxTimeNow1
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+ // However, after compacting txEmptyTable we should get the latest upper bound
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txEmptyTable, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+ } finally {
+ transactionPruningPlugin.destroy();
+ hBaseAdmin.disableTable(txEmptyTable);
+ hBaseAdmin.deleteTable(txEmptyTable);
+ }
+ }
+
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation = connection.getRegionLocation(dataTable, row, true);
return regionLocation.getRegionInfo().getRegionName();
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9e9dd46..7485b91 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -310,6 +311,28 @@
}
@Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+ // Record whether the region is empty after a flush
+ HRegion region = e.getEnvironment().getRegion();
+ // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+ // then the region must be empty
+ long numStoreFiles = numStoreFilesForRegion(e);
+ long memstoreSize = region.getMemstoreSize().get();
+ LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+ region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+ if (memstoreSize == 0 && numStoreFiles == 0) {
+ if (pruneEnable == null) {
+ initPruneState(e);
+ }
+
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ compactionState.persistRegionEmpty(System.currentTimeMillis());
+ }
+ }
+
+ }
+
+ @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
@@ -318,25 +341,7 @@
TransactionVisibilityState snapshot = cache.getLatestState();
if (pruneEnable == null) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
- }
- }
- }
+ initPruneState(c);
}
if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
+ private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+ Configuration conf = getConfiguration(c.getEnvironment());
+ // Configuration won't be null in TransactionProcessor but the derived classes might return
+ // null if it is not available temporarily
+ if (conf != null) {
+ pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
+ }
+ }
+ }
+
+ private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+ long numStoreFiles = 0;
+ for (Store store : c.getEnvironment().getRegion().getStores().values()) {
+ numStoreFiles += store.getStorefiles().size();
+ }
+ return numStoreFiles;
+ }
+
/**
* Filter used to include cells visible to in-progress transactions on flush and commit.
*/
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index db7880b..9b856d9 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
}
/**
+ * Persist that the given region is empty at the given time
+ * @param time time in milliseconds
+ */
+ public void persistRegionEmpty(long time) {
+ pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+ }
+ }
+
+ /**
* Releases the usage {@link PruneUpperBoundWriter}.
*/
public void stop() {
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 897e00e..fc0ec76 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
package org.apache.tephra.hbase.txprune;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,11 +54,14 @@
*/
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
+ private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
public static final byte[] FAMILY = {'f'};
public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+ private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
private static final byte[] REGION_KEY_PREFIX = {0x1};
private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ // This value can be used when we don't care about the value we write in a column
+ private static final byte[] COL_VAL = Bytes.toBytes('1');
private final TableSupplier stateTableSupplier;
@@ -148,7 +163,7 @@
for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
}
- return resultMap;
+ return Collections.unmodifiableMap(resultMap);
}
/**
@@ -181,7 +196,7 @@
}
}
}
- return regionPruneInfos;
+ return Collections.unmodifiableList(regionPruneInfos);
}
/**
@@ -223,7 +238,7 @@
// ---------------------------------------------------
// ------- Methods for regions at a given time -------
// ---------------------------------------------------
- // Key: 0x2<time><region-id>
+ // Key: 0x2<inverted time><region-id>
// Col 't': <empty byte array>
// ---------------------------------------------------
@@ -240,12 +255,22 @@
try (Table stateTable = stateTableSupplier.get()) {
for (byte[] region : regions) {
Put put = new Put(makeTimeRegionKey(timeBytes, region));
- put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+ put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL);
stateTable.put(put);
}
+
+ // Save the count of regions as a checksum
+ saveRegionCountForTime(stateTable, timeBytes, regions.size());
}
}
+ @VisibleForTesting
+ void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException {
+ Put put = new Put(makeTimeRegionCountKey(timeBytes));
+ put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+ stateTable.put(put);
+ }
+
/**
* Return the set of regions saved for the time at or before the given time. This method finds the greatest time
* that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
*/
@Nullable
public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
- byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (Table stateTable = stateTableSupplier.get()) {
- Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
- scan.addColumn(FAMILY, REGION_TIME_COL);
-
- SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- long currentRegionTime = -1;
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
- // Stop if reached next time value
- if (currentRegionTime == -1) {
- currentRegionTime = timeRegion.getKey();
- } else if (timeRegion.getKey() < currentRegionTime) {
- break;
- } else if (timeRegion.getKey() > currentRegionTime) {
- throw new IllegalStateException(
- String.format("Got out of order time %d when expecting time less than or equal to %d",
- timeRegion.getKey(), currentRegionTime));
- }
- regions.add(timeRegion.getValue());
+ TimeRegions timeRegions;
+ while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+ int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+ if (count != -1 && count == timeRegions.getRegions().size()) {
+ return timeRegions;
+ } else {
+ LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+ timeRegions.getTime(), count, timeRegions.getRegions().size()));
+ time = time - 1;
}
}
- return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+ return null;
}
}
+ @Nullable
+ private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+ long currentRegionTime = -1;
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Result next;
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ while ((next = scanner.next()) != null) {
+ Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+ // Stop if reached next time value
+ if (currentRegionTime == -1) {
+ currentRegionTime = timeRegion.getKey();
+ } else if (timeRegion.getKey() < currentRegionTime) {
+ break;
+ } else if (timeRegion.getKey() > currentRegionTime) {
+ throw new IllegalStateException(
+ String.format("Got out of order time %d when expecting time less than or equal to %d",
+ timeRegion.getKey(), currentRegionTime));
+ }
+ regions.add(timeRegion.getValue());
+ }
+ }
+ return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+ }
+
+ @VisibleForTesting
+ int getRegionCountForTime(Table stateTable, long time) throws IOException {
+ Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+ get.addColumn(FAMILY, REGION_TIME_COL);
+ Result result = stateTable.get(get);
+ byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+ return value == null ? -1 : Bytes.toInt(value);
+ }
+
/**
* Delete all the regions that were recorded for all times equal or less than the given time.
*
@@ -294,15 +345,15 @@
public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (Table stateTable = stateTableSupplier.get()) {
+ // Delete the regions
Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
- }
- }
+ // Delete the count
+ scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
}
}
@@ -356,14 +407,82 @@
Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ // --------------------------------------------------------
+ // ------- Methods for empty regions at a given time -------
+ // --------------------------------------------------------
+ // Key: 0x4<time><region-id>
+ // Col 'e': <empty byte array>
+ // --------------------------------------------------------
+
+ /**
+ * Save the given region as empty as of the given time.
+ *
+ * @param time time in milliseconds
+ * @param regionId region id
+ */
+ public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(time);
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+ put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Return regions that were recorded as empty after the given time.
+ *
+ * @param time time in milliseconds
+ * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+ * and the empty regions after the given time
+ */
+ public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+ throws IOException {
+ SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+ EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
try (ResultScanner scanner = stateTable.getScanner(scan)) {
Result next;
while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
+ byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+ if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+ emptyRegions.add(emptyRegion);
+ }
}
}
}
+ return Collections.unmodifiableSortedSet(emptyRegions);
+ }
+
+ /**
+ * Delete empty region records saved on or before the given time.
+ *
+ * @param time time in milliseconds
+ */
+ public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan();
+ scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ @VisibleForTesting
+ void deleteFromScan(Table stateTable, Scan scan) throws IOException {
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
}
private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
}
+ private byte[] makeTimeRegionCountKey(byte[] time) {
+ return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+ }
+
private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
}
@@ -391,6 +514,15 @@
return Maps.immutableEntry(time, regionName);
}
+ private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+ return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+ }
+
+ private byte[] getEmptyRegionFromKey(byte[] key) {
+ int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+ return Bytes.copy(key, prefixLen, key.length - prefixLen);
+ }
+
private long getInvertedTime(long time) {
return Long.MAX_VALUE - time;
}
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 95216b9..84c480a 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
/**
@@ -200,6 +201,8 @@
dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
}
@Override
@@ -288,26 +291,40 @@
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
long time = timeRegions.getTime();
- Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+ long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+ LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+ // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+ if (inactiveTransactionBound == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+ "and hence the data must be incomplete", time);
+ }
+ continue;
+ }
+
+ // Get the prune upper bounds for all the transactional regions
+ Map<byte[], Long> pruneUpperBoundRegions =
+ dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+ // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+ // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started on or before inactiveTransactionBoundTime
+ pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+ pruneUpperBoundRegions);
+
// If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
// across all regions
- if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
- long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
- LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
- // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
- if (inactiveTransactionBound != -1) {
- Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
- return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
- "and hence the data must be incomplete", time);
- }
- }
+ if (!transactionalRegions.isEmpty() &&
+ pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+ Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+ long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+ LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+ return pruneUpperBound;
} else {
if (LOG.isDebugEnabled()) {
- Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+ Sets.SetView<byte[]> difference =
+ Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
}
@@ -318,6 +335,28 @@
return -1;
}
+ private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+ SortedSet<byte[]> transactionalRegions,
+ Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+ long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+ SortedSet<byte[]> emptyRegions =
+ dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+ LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+ inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+ // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+ // for these empty regions as inactiveTransactionBound
+ Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+ for (byte[] emptyRegion : emptyRegions) {
+ if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+ pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+ }
+ }
+ return Collections.unmodifiableMap(pubWithEmptyRegions);
+ }
+
private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 7e9d1a3..9773a15 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
private final TableName tableName;
private final DataJanitorState dataJanitorState;
private final long pruneFlushInterval;
+ // Map of region name -> prune upper bound
private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+ // Map of region name -> time the region was found to be empty
+ private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
private long lastChecked;
+ @SuppressWarnings("WeakerAccess")
public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
this.tableName = tableName;
this.dataJanitorState = dataJanitorState;
this.pruneFlushInterval = pruneFlushInterval;
this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
}
+ @SuppressWarnings("WeakerAccess")
public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+ warnIfNotRunning(regionName);
// The number of entries in this map is bound by the number of regions in this region server and thus it will not
// grow indefinitely
pruneEntries.put(regionName, pruneUpperBound);
}
+ @SuppressWarnings("WeakerAccess")
+ public void persistRegionEmpty(byte[] regionName, long time) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ emptyRegions.put(regionName, time);
+ }
+
+ @SuppressWarnings("WeakerAccess")
public boolean isAlive() {
return flushThread != null && flushThread.isAlive();
}
@@ -86,13 +102,22 @@
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- while (pruneEntries.firstEntry() != null) {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new pruneUpperBound for the same key has been added
pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
}
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNameWithNamespaceInclAsString(), ex);
@@ -115,4 +140,11 @@
flushThread.setDaemon(true);
flushThread.start();
}
+
+ private void warnIfNotRunning(byte[] regionName) {
+ if (!isRunning() || !isAlive()) {
+ LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+ Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+ }
+ }
}
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 402892f..b96d87d 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -156,6 +156,7 @@
}
// Verify saved regions
+ Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -163,20 +164,39 @@
dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+ // Now change the count stored for regions saved at time 0 and 30
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+ }
+ // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
// Delete regions saved on or before time 30
dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
// Values on or before time 30 should be deleted
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+ // Counts should be deleted for time on or before 30
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+ }
// Values after time 30 should still exist
Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+ }
}
@Test
public void testSaveInactiveTransactionBoundTime() throws Exception {
int maxTime = 100;
- // Nothing sould be present in the beginning
+ // Nothing should be present in the beginning
Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
// Save inactive transaction bounds for various time values
@@ -202,4 +222,59 @@
Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
}
+
+ @Test
+ public void testSaveEmptyRegions() throws Exception {
+ // Nothing should be present in the beginning
+ Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ byte[] region1 = Bytes.toBytes("region1");
+ byte[] region2 = Bytes.toBytes("region2");
+ byte[] region3 = Bytes.toBytes("region3");
+ byte[] region4 = Bytes.toBytes("region4");
+ SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+ // Now record some empty regions
+ dataJanitorState.saveEmptyRegionForTime(100, region1);
+ dataJanitorState.saveEmptyRegionForTime(110, region1);
+ dataJanitorState.saveEmptyRegionForTime(102, region2);
+ dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ Assert.assertEquals(toISet(region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+ Assert.assertEquals(toISet(region3),
+ dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+ // Delete empty regions on or before time 110
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+ // Now only region3 should remain
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ // Delete empty regions on or before time 150
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+ // Now nothing should remain
+ Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ }
+
+ private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+ ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+ for (byte[] arg : args) {
+ builder.add(arg);
+ }
+ return builder.build();
+ }
}
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index a431ee3..c99904b 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -72,6 +72,7 @@
private static TableName txDataTable1;
private static TableName pruneStateTable;
+ private static DataJanitorState dataJanitorState;
// Override AbstractHBaseTableTest.startMiniCluster to setup configuration
@BeforeClass
@@ -105,6 +106,14 @@
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
}
@AfterClass
@@ -128,7 +137,12 @@
@After
public void afterTest() throws Exception {
+ // Disable the data table so that prune writer thread gets stopped,
+ // this makes sure that any cached value will not interfere with next test
+ hBaseAdmin.disableTable(txDataTable1);
deletePruneStateTable();
+ // Enabling the table enables the prune writer thread again
+ hBaseAdmin.enableTable(txDataTable1);
}
private void deletePruneStateTable() throws Exception {
@@ -138,32 +152,8 @@
}
}
- private void truncatePruneStateTable() throws Exception {
- if (hBaseAdmin.tableExists(pruneStateTable)) {
- if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
- hBaseAdmin.disableTable(pruneStateTable);
- }
- hBaseAdmin.truncateTable(pruneStateTable, true);
- }
- }
-
@Test
public void testRecordCompactionState() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public Table get() throws IOException {
- return testUtil.getConnection().getTable(pruneStateTable);
- }
- });
-
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
// No prune upper bound initially
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -212,10 +202,6 @@
@Test
public void testRecordCompactionStateNoTable() throws Exception {
- // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
- // and make sure a major compaction succeeds
- deletePruneStateTable();
-
// Create a new transaction snapshot
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -247,24 +233,9 @@
@Test
public void testPruneUpperBound() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public Table get() throws IOException {
- return testUtil.getConnection().getTable(pruneStateTable);
- }
- });
-
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
try {
// Run without a transaction snapshot first
long now1 = 200;
@@ -334,6 +305,87 @@
}
}
+ @Test
+ public void testPruneEmptyTable() throws Exception {
+ // Make sure that empty tables do not block the progress of pruning
+
+ // Create an empty table
+ TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+ HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.compact(txEmptyTable, true);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+ hBaseAdmin.flush(txEmptyTable);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, again, this time it should work
+ pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+ // Now add some data to the empty table
+ // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+ emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
+ emptyHTable.close();
+
+ // Now run another compaction on txDataTable1 with an updated tx snapshot
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+ ImmutableSet.of(expectedPruneUpperBound2),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+ // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+ // empty in the previous run with inactiveTxTimeNow1
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+ // However, after compacting txEmptyTable we should get the latest upper bound
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txEmptyTable, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+ } finally {
+ transactionPruningPlugin.destroy();
+ hBaseAdmin.disableTable(txEmptyTable);
+ hBaseAdmin.deleteTable(txEmptyTable);
+ }
+ }
+
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation =
testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9e9dd46..7485b91 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -310,6 +311,28 @@
}
@Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+ // Record whether the region is empty after a flush
+ HRegion region = e.getEnvironment().getRegion();
+ // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+ // then the region must be empty
+ long numStoreFiles = numStoreFilesForRegion(e);
+ long memstoreSize = region.getMemstoreSize().get();
+ LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+ region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+ if (memstoreSize == 0 && numStoreFiles == 0) {
+ if (pruneEnable == null) {
+ initPruneState(e);
+ }
+
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ compactionState.persistRegionEmpty(System.currentTimeMillis());
+ }
+ }
+
+ }
+
+ @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
@@ -318,25 +341,7 @@
TransactionVisibilityState snapshot = cache.getLatestState();
if (pruneEnable == null) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
- }
- }
- }
+ initPruneState(c);
}
if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
+ private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+ Configuration conf = getConfiguration(c.getEnvironment());
+ // Configuration won't be null in TransactionProcessor but the derived classes might return
+ // null if it is not available temporarily
+ if (conf != null) {
+ pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
+ }
+ }
+ }
+
+ private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+ long numStoreFiles = 0;
+ for (Store store : c.getEnvironment().getRegion().getStores().values()) {
+ numStoreFiles += store.getStorefiles().size();
+ }
+ return numStoreFiles;
+ }
+
/**
* Filter used to include cells visible to in-progress transactions on flush and commit.
*/
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index db7880b..9b856d9 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
}
/**
+ * Persist that the given region is empty at the given time
+ * @param time time in milliseconds
+ */
+ public void persistRegionEmpty(long time) {
+ pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+ }
+ }
+
+ /**
* Releases the usage {@link PruneUpperBoundWriter}.
*/
public void stop() {
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 897e00e..fc0ec76 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
package org.apache.tephra.hbase.txprune;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,11 +54,14 @@
*/
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
+ private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
public static final byte[] FAMILY = {'f'};
public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+ private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
private static final byte[] REGION_KEY_PREFIX = {0x1};
private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ // This value can be used when we don't care about the value we write in a column
+ private static final byte[] COL_VAL = Bytes.toBytes('1');
private final TableSupplier stateTableSupplier;
@@ -148,7 +163,7 @@
for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
}
- return resultMap;
+ return Collections.unmodifiableMap(resultMap);
}
/**
@@ -181,7 +196,7 @@
}
}
}
- return regionPruneInfos;
+ return Collections.unmodifiableList(regionPruneInfos);
}
/**
@@ -223,7 +238,7 @@
// ---------------------------------------------------
// ------- Methods for regions at a given time -------
// ---------------------------------------------------
- // Key: 0x2<time><region-id>
+ // Key: 0x2<inverted time><region-id>
// Col 't': <empty byte array>
// ---------------------------------------------------
@@ -240,12 +255,22 @@
try (Table stateTable = stateTableSupplier.get()) {
for (byte[] region : regions) {
Put put = new Put(makeTimeRegionKey(timeBytes, region));
- put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+ put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL);
stateTable.put(put);
}
+
+ // Save the count of regions as a checksum
+ saveRegionCountForTime(stateTable, timeBytes, regions.size());
}
}
+ @VisibleForTesting
+ void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException {
+ Put put = new Put(makeTimeRegionCountKey(timeBytes));
+ put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+ stateTable.put(put);
+ }
+
/**
* Return the set of regions saved for the time at or before the given time. This method finds the greatest time
* that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
*/
@Nullable
public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
- byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (Table stateTable = stateTableSupplier.get()) {
- Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
- scan.addColumn(FAMILY, REGION_TIME_COL);
-
- SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- long currentRegionTime = -1;
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
- // Stop if reached next time value
- if (currentRegionTime == -1) {
- currentRegionTime = timeRegion.getKey();
- } else if (timeRegion.getKey() < currentRegionTime) {
- break;
- } else if (timeRegion.getKey() > currentRegionTime) {
- throw new IllegalStateException(
- String.format("Got out of order time %d when expecting time less than or equal to %d",
- timeRegion.getKey(), currentRegionTime));
- }
- regions.add(timeRegion.getValue());
+ TimeRegions timeRegions;
+ while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+ int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+ if (count != -1 && count == timeRegions.getRegions().size()) {
+ return timeRegions;
+ } else {
+ LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+ timeRegions.getTime(), count, timeRegions.getRegions().size()));
+ time = time - 1;
}
}
- return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+ return null;
}
}
+ @Nullable
+ private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+ long currentRegionTime = -1;
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Result next;
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ while ((next = scanner.next()) != null) {
+ Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+ // Stop if reached next time value
+ if (currentRegionTime == -1) {
+ currentRegionTime = timeRegion.getKey();
+ } else if (timeRegion.getKey() < currentRegionTime) {
+ break;
+ } else if (timeRegion.getKey() > currentRegionTime) {
+ throw new IllegalStateException(
+ String.format("Got out of order time %d when expecting time less than or equal to %d",
+ timeRegion.getKey(), currentRegionTime));
+ }
+ regions.add(timeRegion.getValue());
+ }
+ }
+ return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+ }
+
+ @VisibleForTesting
+ int getRegionCountForTime(Table stateTable, long time) throws IOException {
+ Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+ get.addColumn(FAMILY, REGION_TIME_COL);
+ Result result = stateTable.get(get);
+ byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+ return value == null ? -1 : Bytes.toInt(value);
+ }
+
/**
* Delete all the regions that were recorded for all times equal or less than the given time.
*
@@ -294,15 +345,15 @@
public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (Table stateTable = stateTableSupplier.get()) {
+ // Delete the regions
Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
- }
- }
+ // Delete the count
+ scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
}
}
@@ -356,14 +407,82 @@
Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ // --------------------------------------------------------
+ // ------- Methods for empty regions at a given time -------
+ // --------------------------------------------------------
+ // Key: 0x4<time><region-id>
+ // Col 'e': <empty byte array>
+ // --------------------------------------------------------
+
+ /**
+ * Save the given region as empty as of the given time.
+ *
+ * @param time time in milliseconds
+ * @param regionId region id
+ */
+ public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(time);
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+ put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Return regions that were recorded as empty after the given time.
+ *
+ * @param time time in milliseconds
+ * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+ * and the empty regions after the given time
+ */
+ public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+ throws IOException {
+ SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+ EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
try (ResultScanner scanner = stateTable.getScanner(scan)) {
Result next;
while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
+ byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+ if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+ emptyRegions.add(emptyRegion);
+ }
}
}
}
+ return Collections.unmodifiableSortedSet(emptyRegions);
+ }
+
+ /**
+ * Delete empty region records saved on or before the given time.
+ *
+ * @param time time in milliseconds
+ */
+ public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan();
+ scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ @VisibleForTesting
+ void deleteFromScan(Table stateTable, Scan scan) throws IOException {
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
}
private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
}
+ private byte[] makeTimeRegionCountKey(byte[] time) {
+ return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+ }
+
private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
}
@@ -391,6 +514,15 @@
return Maps.immutableEntry(time, regionName);
}
+ private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+ return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+ }
+
+ private byte[] getEmptyRegionFromKey(byte[] key) {
+ int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+ return Bytes.copy(key, prefixLen, key.length - prefixLen);
+ }
+
private long getInvertedTime(long time) {
return Long.MAX_VALUE - time;
}
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index a63cf75..8142601 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
/**
@@ -201,6 +202,8 @@
dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
}
@Override
@@ -289,26 +292,40 @@
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
long time = timeRegions.getTime();
- Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+ long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+ LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+ // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+ if (inactiveTransactionBound == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+ "and hence the data must be incomplete", time);
+ }
+ continue;
+ }
+
+ // Get the prune upper bounds for all the transactional regions
+ Map<byte[], Long> pruneUpperBoundRegions =
+ dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+ // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+ // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started on or before inactiveTransactionBoundTime
+ pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+ pruneUpperBoundRegions);
+
// If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
// across all regions
- if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
- long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
- LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
- // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
- if (inactiveTransactionBound != -1) {
- Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
- return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
- "and hence the data must be incomplete", time);
- }
- }
+ if (!transactionalRegions.isEmpty() &&
+ pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+ Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+ long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+ LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+ return pruneUpperBound;
} else {
if (LOG.isDebugEnabled()) {
- Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+ Sets.SetView<byte[]> difference =
+ Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
}
@@ -319,6 +336,28 @@
return -1;
}
+ private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+ SortedSet<byte[]> transactionalRegions,
+ Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+ long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+ SortedSet<byte[]> emptyRegions =
+ dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+ LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+ inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+ // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+ // for these empty regions as inactiveTransactionBound
+ Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+ for (byte[] emptyRegion : emptyRegions) {
+ if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+ pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+ }
+ }
+ return Collections.unmodifiableMap(pubWithEmptyRegions);
+ }
+
private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 5a86b4a..beed1ad 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
private final TableName tableName;
private final DataJanitorState dataJanitorState;
private final long pruneFlushInterval;
+ // Map of region name -> prune upper bound
private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+ // Map of region name -> time the region was found to be empty
+ private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
private long lastChecked;
+ @SuppressWarnings("WeakerAccess")
public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
this.tableName = tableName;
this.dataJanitorState = dataJanitorState;
this.pruneFlushInterval = pruneFlushInterval;
this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
}
+ @SuppressWarnings("WeakerAccess")
public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+ warnIfNotRunning(regionName);
// The number of entries in this map is bound by the number of regions in this region server and thus it will not
// grow indefinitely
pruneEntries.put(regionName, pruneUpperBound);
}
+ @SuppressWarnings("WeakerAccess")
+ public void persistRegionEmpty(byte[] regionName, long time) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ emptyRegions.put(regionName, time);
+ }
+
+ @SuppressWarnings("WeakerAccess")
public boolean isAlive() {
return flushThread != null && flushThread.isAlive();
}
@@ -86,13 +102,22 @@
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- while (pruneEntries.firstEntry() != null) {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new pruneUpperBound for the same key has been added
pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
}
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
@@ -115,4 +140,11 @@
flushThread.setDaemon(true);
flushThread.start();
}
+
+ private void warnIfNotRunning(byte[] regionName) {
+ if (!isRunning() || !isAlive()) {
+ LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+ Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+ }
+ }
}
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 402892f..b96d87d 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -156,6 +156,7 @@
}
// Verify saved regions
+ Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -163,20 +164,39 @@
dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+ // Now change the count stored for regions saved at time 0 and 30
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+ }
+ // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
// Delete regions saved on or before time 30
dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
// Values on or before time 30 should be deleted
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+ // Counts should be deleted for time on or before 30
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+ }
// Values after time 30 should still exist
Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+ }
}
@Test
public void testSaveInactiveTransactionBoundTime() throws Exception {
int maxTime = 100;
- // Nothing sould be present in the beginning
+ // Nothing should be present in the beginning
Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
// Save inactive transaction bounds for various time values
@@ -202,4 +222,59 @@
Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
}
+
+ @Test
+ public void testSaveEmptyRegions() throws Exception {
+ // Nothing should be present in the beginning
+ Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ byte[] region1 = Bytes.toBytes("region1");
+ byte[] region2 = Bytes.toBytes("region2");
+ byte[] region3 = Bytes.toBytes("region3");
+ byte[] region4 = Bytes.toBytes("region4");
+ SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+ // Now record some empty regions
+ dataJanitorState.saveEmptyRegionForTime(100, region1);
+ dataJanitorState.saveEmptyRegionForTime(110, region1);
+ dataJanitorState.saveEmptyRegionForTime(102, region2);
+ dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ Assert.assertEquals(toISet(region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+ Assert.assertEquals(toISet(region3),
+ dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+ // Delete empty regions on or before time 110
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+ // Now only region3 should remain
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ // Delete empty regions on or before time 150
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+ // Now nothing should remain
+ Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ }
+
+ private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+ ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+ for (byte[] arg : args) {
+ builder.add(arg);
+ }
+ return builder.build();
+ }
}
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index a431ee3..07746d8 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -72,6 +72,7 @@
private static TableName txDataTable1;
private static TableName pruneStateTable;
+ private static DataJanitorState dataJanitorState;
// Override AbstractHBaseTableTest.startMiniCluster to setup configuration
@BeforeClass
@@ -105,6 +106,14 @@
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
}
@AfterClass
@@ -128,7 +137,12 @@
@After
public void afterTest() throws Exception {
+ // Disable the data table so that prune writer thread gets stopped,
+ // this makes sure that any cached value will not interfere with next test
+ hBaseAdmin.disableTable(txDataTable1);
deletePruneStateTable();
+ // Enabling the table enables the prune writer thread again
+ hBaseAdmin.enableTable(txDataTable1);
}
private void deletePruneStateTable() throws Exception {
@@ -138,32 +152,8 @@
}
}
- private void truncatePruneStateTable() throws Exception {
- if (hBaseAdmin.tableExists(pruneStateTable)) {
- if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
- hBaseAdmin.disableTable(pruneStateTable);
- }
- hBaseAdmin.truncateTable(pruneStateTable, true);
- }
- }
-
@Test
public void testRecordCompactionState() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public Table get() throws IOException {
- return testUtil.getConnection().getTable(pruneStateTable);
- }
- });
-
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
// No prune upper bound initially
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -212,10 +202,6 @@
@Test
public void testRecordCompactionStateNoTable() throws Exception {
- // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
- // and make sure a major compaction succeeds
- deletePruneStateTable();
-
// Create a new transaction snapshot
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -247,24 +233,9 @@
@Test
public void testPruneUpperBound() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public Table get() throws IOException {
- return testUtil.getConnection().getTable(pruneStateTable);
- }
- });
-
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
try {
// Run without a transaction snapshot first
long now1 = 200;
@@ -334,6 +305,87 @@
}
}
+ @Test
+ public void testPruneEmptyTable() throws Exception {
+ // Make sure that empty tables do not block the progress of pruning
+
+ // Create an empty table
+ TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+ HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.compact(txEmptyTable, true);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+ hBaseAdmin.flush(txEmptyTable);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, again, this time it should work
+ pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+ // Now add some data to the empty table
+ // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+ emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
+ emptyHTable.close();
+
+ // Now run another compaction on txDataTable1 with an updated tx snapshot
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+ ImmutableSet.of(expectedPruneUpperBound2),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+ // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+ // empty in the previous run with inactiveTxTimeNow1
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+ // However, after compacting txEmptyTable we should get the latest upper bound
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txEmptyTable, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+ } finally {
+ transactionPruningPlugin.destroy();
+ hBaseAdmin.disableTable(txEmptyTable);
+ hBaseAdmin.deleteTable(txEmptyTable);
+ }
+ }
+
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation =
testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 015077b..5e1b4c5 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -310,6 +311,28 @@
}
@Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+ // Record whether the region is empty after a flush
+ Region region = e.getEnvironment().getRegion();
+ // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+ // then the region must be empty
+ long numStoreFiles = numStoreFilesForRegion(e);
+ long memstoreSize = region.getMemstoreSize();
+ LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+ region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+ if (memstoreSize == 0 && numStoreFiles == 0) {
+ if (pruneEnable == null) {
+ initPruneState(e);
+ }
+
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ compactionState.persistRegionEmpty(System.currentTimeMillis());
+ }
+ }
+
+ }
+
+ @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
@@ -318,25 +341,7 @@
TransactionVisibilityState snapshot = cache.getLatestState();
if (pruneEnable == null) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
- }
- }
- }
+ initPruneState(c);
}
if (Boolean.TRUE.equals(pruneEnable)) {
@@ -449,6 +454,36 @@
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
+ private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
+ Configuration conf = getConfiguration(c.getEnvironment());
+ // Configuration won't be null in TransactionProcessor but the derived classes might return
+ // null if it is not available temporarily
+ if (conf != null) {
+ pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
+ }
+ }
+ }
+
+ private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+ long numStoreFiles = 0;
+ for (Store store : c.getEnvironment().getRegion().getStores()) {
+ numStoreFiles += store.getStorefiles().size();
+ }
+ return numStoreFiles;
+ }
+
/**
* Filter used to include cells visible to in-progress transactions on flush and commit.
*/
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index db7880b..9b856d9 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -93,6 +93,17 @@
}
/**
+ * Persist that the given region is empty at the given time
+ * @param time time in milliseconds
+ */
+ public void persistRegionEmpty(long time) {
+ pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+ }
+ }
+
+ /**
* Releases the usage {@link PruneUpperBoundWriter}.
*/
public void stop() {
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index 897e00e..fc0ec76 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -19,7 +19,10 @@
package org.apache.tephra.hbase.txprune;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
@@ -35,6 +38,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,11 +54,14 @@
*/
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
+ private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
public static final byte[] FAMILY = {'f'};
public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+ private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
private static final byte[] REGION_KEY_PREFIX = {0x1};
private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
@@ -65,7 +72,15 @@
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ // This value can be used when we don't care about the value we write in a column
+ private static final byte[] COL_VAL = Bytes.toBytes('1');
private final TableSupplier stateTableSupplier;
@@ -148,7 +163,7 @@
for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
}
- return resultMap;
+ return Collections.unmodifiableMap(resultMap);
}
/**
@@ -181,7 +196,7 @@
}
}
}
- return regionPruneInfos;
+ return Collections.unmodifiableList(regionPruneInfos);
}
/**
@@ -223,7 +238,7 @@
// ---------------------------------------------------
// ------- Methods for regions at a given time -------
// ---------------------------------------------------
- // Key: 0x2<time><region-id>
+ // Key: 0x2<inverted time><region-id>
// Col 't': <empty byte array>
// ---------------------------------------------------
@@ -240,12 +255,22 @@
try (Table stateTable = stateTableSupplier.get()) {
for (byte[] region : regions) {
Put put = new Put(makeTimeRegionKey(timeBytes, region));
- put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+ put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL);
stateTable.put(put);
}
+
+ // Save the count of regions as a checksum
+ saveRegionCountForTime(stateTable, timeBytes, regions.size());
}
}
+ @VisibleForTesting
+ void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException {
+ Put put = new Put(makeTimeRegionCountKey(timeBytes));
+ put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+ stateTable.put(put);
+ }
+
/**
* Return the set of regions saved for the time at or before the given time. This method finds the greatest time
* that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
@@ -257,34 +282,60 @@
*/
@Nullable
public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
- byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (Table stateTable = stateTableSupplier.get()) {
- Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
- scan.addColumn(FAMILY, REGION_TIME_COL);
-
- SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- long currentRegionTime = -1;
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
- // Stop if reached next time value
- if (currentRegionTime == -1) {
- currentRegionTime = timeRegion.getKey();
- } else if (timeRegion.getKey() < currentRegionTime) {
- break;
- } else if (timeRegion.getKey() > currentRegionTime) {
- throw new IllegalStateException(
- String.format("Got out of order time %d when expecting time less than or equal to %d",
- timeRegion.getKey(), currentRegionTime));
- }
- regions.add(timeRegion.getValue());
+ TimeRegions timeRegions;
+ while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+ int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+ if (count != -1 && count == timeRegions.getRegions().size()) {
+ return timeRegions;
+ } else {
+ LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+ timeRegions.getTime(), count, timeRegions.getRegions().size()));
+ time = time - 1;
}
}
- return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+ return null;
}
}
+ @Nullable
+ private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+ long currentRegionTime = -1;
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Result next;
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ while ((next = scanner.next()) != null) {
+ Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+ // Stop if reached next time value
+ if (currentRegionTime == -1) {
+ currentRegionTime = timeRegion.getKey();
+ } else if (timeRegion.getKey() < currentRegionTime) {
+ break;
+ } else if (timeRegion.getKey() > currentRegionTime) {
+ throw new IllegalStateException(
+ String.format("Got out of order time %d when expecting time less than or equal to %d",
+ timeRegion.getKey(), currentRegionTime));
+ }
+ regions.add(timeRegion.getValue());
+ }
+ }
+ return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+ }
+
+ @VisibleForTesting
+ int getRegionCountForTime(Table stateTable, long time) throws IOException {
+ Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+ get.addColumn(FAMILY, REGION_TIME_COL);
+ Result result = stateTable.get(get);
+ byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+ return value == null ? -1 : Bytes.toInt(value);
+ }
+
/**
* Delete all the regions that were recorded for all times equal or less than the given time.
*
@@ -294,15 +345,15 @@
public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
try (Table stateTable = stateTableSupplier.get()) {
+ // Delete the regions
Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
- try (ResultScanner scanner = stateTable.getScanner(scan)) {
- Result next;
- while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
- }
- }
+ // Delete the count
+ scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
}
}
@@ -356,14 +407,82 @@
Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ // --------------------------------------------------------
+ // ------- Methods for empty regions at a given time -------
+ // --------------------------------------------------------
+ // Key: 0x4<time><region-id>
+ // Col 'e': <empty byte array>
+ // --------------------------------------------------------
+
+ /**
+ * Save the given region as empty as of the given time.
+ *
+ * @param time time in milliseconds
+ * @param regionId region id
+ */
+ public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(time);
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+ put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Return regions that were recorded as empty after the given time.
+ *
+ * @param time time in milliseconds
+ * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+ * and the empty regions after the given time
+ */
+ public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+ throws IOException {
+ SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+ EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
try (ResultScanner scanner = stateTable.getScanner(scan)) {
Result next;
while ((next = scanner.next()) != null) {
- stateTable.delete(new Delete(next.getRow()));
+ byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+ if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+ emptyRegions.add(emptyRegion);
+ }
}
}
}
+ return Collections.unmodifiableSortedSet(emptyRegions);
+ }
+
+ /**
+ * Delete empty region records saved on or before the given time.
+ *
+ * @param time time in milliseconds
+ */
+ public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan();
+ scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ @VisibleForTesting
+ void deleteFromScan(Table stateTable, Scan scan) throws IOException {
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
}
private byte[] makeRegionKey(byte[] regionId) {
@@ -379,6 +498,10 @@
return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
}
+ private byte[] makeTimeRegionCountKey(byte[] time) {
+ return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+ }
+
private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
}
@@ -391,6 +514,15 @@
return Maps.immutableEntry(time, regionName);
}
+ private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+ return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+ }
+
+ private byte[] getEmptyRegionFromKey(byte[] key) {
+ int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+ return Bytes.copy(key, prefixLen, key.length - prefixLen);
+ }
+
private long getInvertedTime(long time) {
return Long.MAX_VALUE - time;
}
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 99c514f..84c480a 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
/**
@@ -121,7 +122,7 @@
public void initialize(Configuration conf) throws IOException {
this.conf = conf;
this.connection = ConnectionFactory.createConnection(conf);
-
+
final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
@@ -200,6 +201,8 @@
dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
}
@Override
@@ -288,26 +291,40 @@
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
long time = timeRegions.getTime();
- Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+ long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+ LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+ // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+ if (inactiveTransactionBound == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+ "and hence the data must be incomplete", time);
+ }
+ continue;
+ }
+
+ // Get the prune upper bounds for all the transactional regions
+ Map<byte[], Long> pruneUpperBoundRegions =
+ dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+ // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+ // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started on or before inactiveTransactionBoundTime
+ pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+ pruneUpperBoundRegions);
+
// If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
// across all regions
- if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
- long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
- LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
- // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
- if (inactiveTransactionBound != -1) {
- Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
- return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
- "and hence the data must be incomplete", time);
- }
- }
+ if (!transactionalRegions.isEmpty() &&
+ pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+ Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+ long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+ LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+ return pruneUpperBound;
} else {
if (LOG.isDebugEnabled()) {
- Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+ Sets.SetView<byte[]> difference =
+ Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
}
@@ -318,6 +335,28 @@
return -1;
}
+ private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+ SortedSet<byte[]> transactionalRegions,
+ Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+ long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+ SortedSet<byte[]> emptyRegions =
+ dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+ LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+ inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+ // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+ // for these empty regions as inactiveTransactionBound
+ Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+ for (byte[] emptyRegion : emptyRegions) {
+ if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+ pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+ }
+ }
+ return Collections.unmodifiableMap(pubWithEmptyRegions);
+ }
+
private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got region - prune upper bound map: {}",
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 7e9d1a3..9773a15 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -39,25 +39,41 @@
private final TableName tableName;
private final DataJanitorState dataJanitorState;
private final long pruneFlushInterval;
+ // Map of region name -> prune upper bound
private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+ // Map of region name -> time the region was found to be empty
+ private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
private long lastChecked;
+ @SuppressWarnings("WeakerAccess")
public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
this.tableName = tableName;
this.dataJanitorState = dataJanitorState;
this.pruneFlushInterval = pruneFlushInterval;
this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
}
+ @SuppressWarnings("WeakerAccess")
public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+ warnIfNotRunning(regionName);
// The number of entries in this map is bound by the number of regions in this region server and thus it will not
// grow indefinitely
pruneEntries.put(regionName, pruneUpperBound);
}
+ @SuppressWarnings("WeakerAccess")
+ public void persistRegionEmpty(byte[] regionName, long time) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ emptyRegions.put(regionName, time);
+ }
+
+ @SuppressWarnings("WeakerAccess")
public boolean isAlive() {
return flushThread != null && flushThread.isAlive();
}
@@ -86,13 +102,22 @@
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- while (pruneEntries.firstEntry() != null) {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new pruneUpperBound for the same key has been added
pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
}
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNameWithNamespaceInclAsString(), ex);
@@ -115,4 +140,11 @@
flushThread.setDaemon(true);
flushThread.start();
}
+
+ private void warnIfNotRunning(byte[] regionName) {
+ if (!isRunning() || !isAlive()) {
+ LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+ Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+ }
+ }
}
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 402892f..b96d87d 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -156,6 +156,7 @@
}
// Verify saved regions
+ Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
@@ -163,20 +164,39 @@
dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+ // Now change the count stored for regions saved at time 0 and 30
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+ }
+ // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+
// Delete regions saved on or before time 30
dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
// Values on or before time 30 should be deleted
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+ // Counts should be deleted for time on or before 30
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+ }
// Values after time 30 should still exist
Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+ }
}
@Test
public void testSaveInactiveTransactionBoundTime() throws Exception {
int maxTime = 100;
- // Nothing sould be present in the beginning
+ // Nothing should be present in the beginning
Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
// Save inactive transaction bounds for various time values
@@ -202,4 +222,59 @@
Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
}
+
+ @Test
+ public void testSaveEmptyRegions() throws Exception {
+ // Nothing should be present in the beginning
+ Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ byte[] region1 = Bytes.toBytes("region1");
+ byte[] region2 = Bytes.toBytes("region2");
+ byte[] region3 = Bytes.toBytes("region3");
+ byte[] region4 = Bytes.toBytes("region4");
+ SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+ // Now record some empty regions
+ dataJanitorState.saveEmptyRegionForTime(100, region1);
+ dataJanitorState.saveEmptyRegionForTime(110, region1);
+ dataJanitorState.saveEmptyRegionForTime(102, region2);
+ dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ Assert.assertEquals(toISet(region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+ Assert.assertEquals(toISet(region3),
+ dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+ // Delete empty regions on or before time 110
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+ // Now only region3 should remain
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ // Delete empty regions on or before time 150
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+ // Now nothing should remain
+ Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ }
+
+ private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+ ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+ for (byte[] arg : args) {
+ builder.add(arg);
+ }
+ return builder.build();
+ }
}
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index a431ee3..07746d8 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -72,6 +72,7 @@
private static TableName txDataTable1;
private static TableName pruneStateTable;
+ private static DataJanitorState dataJanitorState;
// Override AbstractHBaseTableTest.startMiniCluster to setup configuration
@BeforeClass
@@ -105,6 +106,14 @@
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
}
@AfterClass
@@ -128,7 +137,12 @@
@After
public void afterTest() throws Exception {
+ // Disable the data table so that prune writer thread gets stopped,
+ // this makes sure that any cached value will not interfere with next test
+ hBaseAdmin.disableTable(txDataTable1);
deletePruneStateTable();
+ // Enabling the table enables the prune writer thread again
+ hBaseAdmin.enableTable(txDataTable1);
}
private void deletePruneStateTable() throws Exception {
@@ -138,32 +152,8 @@
}
}
- private void truncatePruneStateTable() throws Exception {
- if (hBaseAdmin.tableExists(pruneStateTable)) {
- if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
- hBaseAdmin.disableTable(pruneStateTable);
- }
- hBaseAdmin.truncateTable(pruneStateTable, true);
- }
- }
-
@Test
public void testRecordCompactionState() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public Table get() throws IOException {
- return testUtil.getConnection().getTable(pruneStateTable);
- }
- });
-
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
// No prune upper bound initially
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -212,10 +202,6 @@
@Test
public void testRecordCompactionStateNoTable() throws Exception {
- // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
- // and make sure a major compaction succeeds
- deletePruneStateTable();
-
// Create a new transaction snapshot
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
@@ -247,24 +233,9 @@
@Test
public void testPruneUpperBound() throws Exception {
- DataJanitorState dataJanitorState =
- new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public Table get() throws IOException {
- return testUtil.getConnection().getTable(pruneStateTable);
- }
- });
-
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
- // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
- TimeUnit.SECONDS.sleep(2);
- // Truncate prune state table to clear any data that might have been written by the previous test
- // This is required because during the shutdown of the previous test, compaction might have kicked in and the
- // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
- truncatePruneStateTable();
-
try {
// Run without a transaction snapshot first
long now1 = 200;
@@ -334,6 +305,87 @@
}
}
+ @Test
+ public void testPruneEmptyTable() throws Exception {
+ // Make sure that empty tables do not block the progress of pruning
+
+ // Create an empty table
+ TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+ HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.compact(txEmptyTable, true);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+ hBaseAdmin.flush(txEmptyTable);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, again, this time it should work
+ pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+ // Now add some data to the empty table
+ // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+ emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
+ emptyHTable.close();
+
+ // Now run another compaction on txDataTable1 with an updated tx snapshot
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+ ImmutableSet.of(expectedPruneUpperBound2),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+ // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+ // empty in the previous run with inactiveTxTimeNow1
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+ // However, after compacting txEmptyTable we should get the latest upper bound
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txEmptyTable, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+ } finally {
+ transactionPruningPlugin.destroy();
+ hBaseAdmin.disableTable(txEmptyTable);
+ hBaseAdmin.deleteTable(txEmptyTable);
+ }
+ }
+
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation =
testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);