Revert "TEPHRA-263 Enforce TTL, regardless of any in-progress transactions. Also Handle the case where TTL is longer than the duration from beginning of epoch to now."
This reverts commit a346efe7560cc3b96349263669c4e15951ad401b.
This closes #66
Signed-off-by: poorna <poorna@apache.org>
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
index 5eb2b3e..aaca23d 100644
--- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
+++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
@@ -19,8 +19,6 @@
package org.apache.tephra.util;
import com.google.common.primitives.Longs;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionType;
@@ -63,7 +61,9 @@
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
*/
public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) {
- return getOldestVisibleTimestamp(ttlByFamily, tx, false);
+ long maxTTL = getMaxTTL(ttlByFamily);
+ // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it
+ return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0;
}
/**
@@ -75,28 +75,12 @@
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
*/
public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) {
- long maxTTL = getMaxTTL(ttlByFamily);
- if (maxTTL == Long.MAX_VALUE) {
- return 0;
+ if (readNonTxnData) {
+ long maxTTL = getMaxTTL(ttlByFamily);
+ return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0;
}
- return getOldestVisibleTimestamp(maxTTL, tx, readNonTxnData);
- }
- /**
- * Returns the oldest visible timestamp for the given transaction, based on the TTL configured. If the TTL is
- * negative or zero, the oldest visible timestamp will be {@code 0}.
- * @param ttl TTL value (in milliseconds)
- * @param tx The current transaction
- * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data
- * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
- */
- public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean readNonTxnData) {
- if (ttl <= 0) {
- return 0;
- }
- long ttlFactor = readNonTxnData ? 1 : TxConstants.MAX_TX_PER_MS;
- // if the computed ttl is negative, return 0 because timestamps can not be negative
- return Math.max(0, tx.getTransactionId() - ttl * ttlFactor);
+ return getOldestVisibleTimestamp(ttlByFamily, tx);
}
/**
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
index b172e89..db687fe 100644
--- a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
@@ -19,14 +19,9 @@
package org.apache.tephra.util;
import org.apache.tephra.Transaction;
-import org.apache.tephra.TxConstants;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertEquals;
/**
@@ -59,34 +54,4 @@
tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS);
Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx));
}
-
- @Test
- public void testTTL() {
- long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS;
- byte[] family = new byte[] { 'd' };
- long currentTxTimeSeconds = 100;
- Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond,
- new long[] {10 * txIdsPerSecond, 30 * txIdsPerSecond},
- new long[] {80 * txIdsPerSecond, 90 * txIdsPerSecond},
- 80 * txIdsPerSecond);
- int ttlSeconds = 60;
- Map<byte[], Long> ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L);
- // ttl should only be impacted by the current transaction's id, and not by any older, in-progress transactions
- Assert.assertEquals((currentTxTimeSeconds - ttlSeconds) * txIdsPerSecond,
- TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx));
- }
-
- @Test
- public void testLargeTTL() {
- long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS;
- byte[] family = new byte[] { 'd' };
- long currentTxTimeSeconds = 100;
- Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond,
- new long[] { }, new long[] { }, 100);
- // ~100 years, so that the computed start timestamp is prior to 0 (epoch)
- long ttlSeconds = TimeUnit.DAYS.toSeconds(365 * 100);
- Map<byte[], Long> ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L);
- // oldest visible timestamp should be 0, not negative, because HBase timestamps can not be negative
- Assert.assertEquals(0, TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx));
- }
}
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
index c46082f..0ca9f9c 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -40,8 +40,7 @@
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
- scanType, null));
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}
/**
@@ -51,15 +50,13 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
- scanType, cellFilter));
+ ScanType scanType, @Nullable Filter cellFilter) {
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index d78ccea..10ecfa4 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -446,7 +446,7 @@
* @param type the type of scan operation being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
- return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
+ return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
/**
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index d24f504..3675268 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -70,7 +70,7 @@
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
+ this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}
/**
@@ -80,20 +80,19 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
- public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
+ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
- TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
+ familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index 7286a2a..4d34ed9 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -74,7 +74,7 @@
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -294,9 +294,8 @@
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);
- long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
- // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
- Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
+ Transaction tx = txManager.startShort();
+ long now = tx.getVisibilityUpperBound();
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
@@ -354,7 +353,7 @@
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
- super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
+ super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}
@Override
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
index c46082f..0ca9f9c 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -40,8 +40,7 @@
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
- scanType, null));
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}
/**
@@ -51,15 +50,13 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
- scanType, cellFilter));
+ ScanType scanType, @Nullable Filter cellFilter) {
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 435ae02..17d55a4 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -446,7 +446,7 @@
* @param type the type of scan being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
- return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
+ return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
/**
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index 3e3db59..9a617a9 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -70,7 +70,7 @@
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
+ this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}
/**
@@ -80,20 +80,19 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
+ ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
- TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
+ familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index 49de0eb..3352eef 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -74,7 +74,7 @@
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -293,9 +293,8 @@
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);
- long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
- // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
- Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
+ Transaction tx = txManager.startShort();
+ long now = tx.getVisibilityUpperBound();
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
@@ -353,7 +352,7 @@
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
- super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
+ super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}
@Override
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
index c46082f..0ca9f9c 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -40,8 +40,7 @@
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
- scanType, null));
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}
/**
@@ -51,15 +50,13 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
- scanType, cellFilter));
+ ScanType scanType, @Nullable Filter cellFilter) {
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index ab2ac8d..ca96052 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -446,7 +446,7 @@
* @param type the type of scan being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
- return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
+ return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
/**
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index 9e32478..9825ded 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -73,7 +73,7 @@
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
+ this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}
/**
@@ -83,20 +83,19 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
+ ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap();
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
- TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
+ familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index d4e4ed1..c27a10d 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -74,7 +74,7 @@
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -293,9 +293,8 @@
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);
- long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
- // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
- Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
+ Transaction tx = txManager.startShort();
+ long now = tx.getVisibilityUpperBound();
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
@@ -353,7 +352,7 @@
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
- super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
+ super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}
@Override
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
index c46082f..0ca9f9c 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -40,8 +40,7 @@
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
- scanType, null));
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}
/**
@@ -51,15 +50,13 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
- scanType, cellFilter));
+ ScanType scanType, @Nullable Filter cellFilter) {
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 7325a7a..263ee98 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -446,7 +446,7 @@
* @param type the type of scan being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
- return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
+ return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
/**
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index 9e32478..9825ded 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -73,7 +73,7 @@
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
+ this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}
/**
@@ -83,20 +83,19 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
+ ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap();
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
- TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
+ familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index 28dfba8..4b2b40c 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -74,7 +74,7 @@
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -293,9 +293,8 @@
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);
- long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
- // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
- Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
+ Transaction tx = txManager.startShort();
+ long now = tx.getVisibilityUpperBound();
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
@@ -353,7 +352,7 @@
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
- super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
+ super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}
@Override
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
index c46082f..0ca9f9c 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -40,8 +40,7 @@
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
- scanType, null));
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}
/**
@@ -51,15 +50,13 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
- scanType, cellFilter));
+ ScanType scanType, @Nullable Filter cellFilter) {
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 848cb1f..553f598 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -446,7 +446,7 @@
* @param type the type of scan being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
- return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
+ return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
/**
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index bd3c719..5ad7c29 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -70,8 +70,8 @@
* @param scanType the type of scan operation being performed
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- ScanType scanType) {
- this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
+ ScanType scanType) {
+ this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}
/**
@@ -81,20 +81,19 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
- public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
+ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap();
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
- TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
+ familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index b6650f6..1b02609 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -74,7 +74,7 @@
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -294,9 +294,8 @@
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);
- long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
- // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
- Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
+ Transaction tx = txManager.startShort();
+ long now = tx.getVisibilityUpperBound();
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
@@ -354,7 +353,7 @@
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
- super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
+ super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}
@Override
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
index c46082f..0ca9f9c 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -40,8 +40,7 @@
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
- scanType, null));
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}
/**
@@ -51,15 +50,13 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
- scanType, cellFilter));
+ ScanType scanType, @Nullable Filter cellFilter) {
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 848cb1f..553f598 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -446,7 +446,7 @@
* @param type the type of scan being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
- return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
+ return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
/**
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index 81eb604..40e2c37 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -70,8 +70,8 @@
* @param scanType the type of scan operation being performed
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- ScanType scanType) {
- this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
+ ScanType scanType) {
+ this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}
/**
@@ -81,20 +81,19 @@
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
- * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
- public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
+ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap();
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
- TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
+ familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index b6650f6..1b02609 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -74,7 +74,7 @@
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
- return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -294,9 +294,8 @@
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);
- long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
- // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
- Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
+ Transaction tx = txManager.startShort();
+ long now = tx.getVisibilityUpperBound();
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
@@ -354,7 +353,7 @@
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
- super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
+ super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}
@Override