TEPHRA-45 Support empty values by using cell tag delete markers
diff --git a/tephra-core/src/main/java/co/cask/tephra/TxConstants.java b/tephra-core/src/main/java/co/cask/tephra/TxConstants.java
index 02c9acd..6d0e0b2 100644
--- a/tephra-core/src/main/java/co/cask/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/co/cask/tephra/TxConstants.java
@@ -26,7 +26,7 @@
  */
 public class TxConstants {
   /**
-   * property set for {@link org.apache.hadoop.hbase.HColumnDescriptor} to configure time-to-live on data within
+   * property set for {@code org.apache.hadoop.hbase.HColumnDescriptor} to configure time-to-live on data within
    * the column family.  The value given is in milliseconds.  Once a cell's data has surpassed the given value in age,
    * the cell's data will no longer be visible and may be garbage collected.
    */
@@ -55,7 +55,11 @@
    * overwritten with an empty {@code byte[]} to flag it as removed.  Cells with empty values will be filtered out
    * of the results for read operations.  If cells with empty values should be included in results (meaning data
    * cannot be transactionally deleted), then set this configuration property to true.
+   *
+   * @deprecated With HBase 0.98+, delete markers are indicated using cell tags, so empty values are implicitly
+   * allowed.
    */
+  @Deprecated
   public static final String ALLOW_EMPTY_VALUES_KEY = "data.tx.allow.empty.values";
   public static final boolean ALLOW_EMPTY_VALUES_DEFAULT = false;
 
@@ -63,6 +67,10 @@
    * Key used to set the serialized transaction as an attribute on Get and Scan operations.
    */
   public static final String TX_OPERATION_ATTRIBUTE_KEY = "cask.tx";
+  /**
+   * Attribute key used to identify a Put as a delete marker
+   */
+  public static final String DELETE_OPERATION_ATTRIBUTE_KEY = "cask.tx.delete";
 
   // Constants for monitoring status
   public static final String STATUS_OK = "OK";
@@ -237,6 +245,9 @@
     public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
     public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";
     public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000;
+
+    /** Cell tag type used for delete markers */
+    public static final byte CELL_TAG_TYPE_DELETE = (byte) 193;
   }
 
   /**
diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/TransactionAwareHTable.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/TransactionAwareHTable.java
index 872cb93..dd47d15 100644
--- a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/TransactionAwareHTable.java
@@ -24,9 +24,11 @@
 import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -62,6 +64,8 @@
  * was started.
  */
 public class TransactionAwareHTable implements HTableInterface, TransactionAware {
+  public static final Tag DELETE_TAG = new Tag(TxConstants.HBase.CELL_TAG_TYPE_DELETE, new byte[0]);
+
   private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
   private Transaction tx;
   private final HTableInterface hTable;
@@ -539,12 +543,12 @@
 
   private Put transactionalizeAction(Put put) throws IOException {
     Put txPut = new Put(put.getRow(), tx.getWritePointer());
-    Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();
+    Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet();
     if (!familyMap.isEmpty()) {
-      for (Map.Entry<byte[], List<KeyValue>> family : familyMap) {
-        List<KeyValue> familyValues = family.getValue();
+      for (Map.Entry<byte[], List<Cell>> family : familyMap) {
+        List<Cell> familyValues = family.getValue();
         if (!familyValues.isEmpty()) {
-          for (KeyValue value : familyValues) {
+          for (Cell value : familyValues) {
             txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue());
             changeSet.add(new ActionChange(txPut.getRow(), value.getFamily(), value.getQualifier()));
           }
@@ -555,7 +559,6 @@
       txPut.setAttribute(entry.getKey(), entry.getValue());
     }
     txPut.setWriteToWAL(put.getWriteToWAL());
-    addToOperation(txPut, tx);
     return txPut;
   }
 
@@ -564,6 +567,7 @@
 
     byte[] deleteRow = delete.getRow();
     Put txPut = new Put(deleteRow, transactionTimestamp);
+    txPut.setAttribute(TxConstants.DELETE_OPERATION_ATTRIBUTE_KEY, new byte[0]);
 
     Map<byte[], List<KeyValue>> familyToDelete = delete.getFamilyMap();
     if (familyToDelete.isEmpty()) {
diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessor.java
index 01cc06a..8254887 100644
--- a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessor.java
@@ -25,17 +25,24 @@
 import co.cask.tephra.persist.TransactionSnapshot;
 import co.cask.tephra.util.TxUtils;
 import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -49,6 +56,7 @@
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
@@ -56,6 +64,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
 
 /**
@@ -88,12 +97,15 @@
  * </p>
  */
 public class TransactionProcessor extends BaseRegionObserver {
+  public static final List<Tag> DELETE_TAGS = ImmutableList.<Tag>of(
+      new Tag(TxConstants.HBase.CELL_TAG_TYPE_DELETE, new byte[]{1}));
+
   private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
 
   private TransactionStateCache cache;
   private final TransactionCodec txCodec;
+
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-  protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -123,9 +135,6 @@
         }
         ttlByFamily.put(columnDesc.getName(), ttl);
       }
-
-      this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
-                                                                TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
     }
   }
 
@@ -151,6 +160,27 @@
   }
 
   @Override
+  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
+      throws IOException {
+    if (put.getAttribute(TxConstants.DELETE_OPERATION_ATTRIBUTE_KEY) != null) {
+      LOG.info("Received a delete for " + put);
+      NavigableMap<byte[], List<Cell>> newFamilyMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+
+      for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap().entrySet()) {
+        List<Cell> newCells = Lists.newArrayListWithCapacity(entry.getValue().size());
+        for (Cell cell : entry.getValue()) {
+          LOG.info("Converting cell: " + cell);
+          newCells.add(new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
+              CellUtil.cloneQualifier(cell), cell.getTimestamp(), CellUtil.cloneValue(cell),
+              ImmutableList.<Tag>of(new Tag(TxConstants.HBase.CELL_TAG_TYPE_DELETE, new byte[]{1}))));
+        }
+        newFamilyMap.put(entry.getKey(), newCells);
+      }
+      put.setFamilyCellMap(newFamilyMap);
+    }
+  }
+
+  @Override
   public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
     throws IOException {
     Transaction tx = getFromOperation(scan);
@@ -223,7 +253,7 @@
    * @param type the type of scan being performed
    */
   protected Filter getTransactionFilter(Transaction tx, ScanType type) {
-    return new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type);
+    return new TransactionVisibilityFilter(tx, ttlByFamily, type);
   }
 
   /**
diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionVisibilityFilter.java
index b938b7f..5b53e77 100644
--- a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionVisibilityFilter.java
@@ -19,8 +19,11 @@
 import co.cask.tephra.Transaction;
 import co.cask.tephra.TxConstants;
 import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.regionserver.ScanType;
@@ -36,11 +39,11 @@
  * any {@code Scan} or {@code Get} operation performed.
  */
 public class TransactionVisibilityFilter extends FilterBase {
+  private static final Log LOG = LogFactory.getLog(TransactionVisibilityFilter.class);
+
   private final Transaction tx;
   // oldest visible timestamp by column family, used to apply TTL when reading
   private final Map<byte[], Long> oldestTsByFamily;
-  // if false, empty values will be interpreted as deletes
-  private final boolean allowEmptyValues;
   // whether or not we can remove delete markers
   // these can only be safely removed when we are traversing all storefiles
   private final boolean clearDeletes;
@@ -56,13 +59,10 @@
    *
    * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
    * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
-   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
-   *                         these will be interpreted as "delete" markers and the column will be filtered out
    * @param scanType the type of scan operation being performed
    */
-  public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
-                                     ScanType scanType) {
-    this(tx, ttlByFamily, allowEmptyValues, scanType, null);
+  public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, ScanType scanType) {
+    this(tx, ttlByFamily, scanType, null);
   }
 
   /**
@@ -70,15 +70,13 @@
    *
    * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
    * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
-   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
-   *                         these will be interpreted as "delete" markers and the column will be filtered out
    * @param scanType the type of scan operation being performed
    * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
    *                   calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}.  If null, then
    *                   {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
    */
-  public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
-                                     ScanType scanType, @Nullable Filter cellFilter) {
+  public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, ScanType scanType,
+                                     @Nullable Filter cellFilter) {
     this.tx = tx;
     this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
     for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
@@ -86,7 +84,6 @@
       oldestTsByFamily.put(ttlEntry.getKey(),
                            familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
     }
-    this.allowEmptyValues = allowEmptyValues;
     this.clearDeletes =
         scanType == ScanType.COMPACT_DROP_DELETES || scanType == ScanType.USER_SCAN;
     this.cellFilter = cellFilter;
@@ -106,11 +103,13 @@
       // passed TTL for this column, seek to next
       return ReturnCode.NEXT_COL;
     } else if (tx.isVisible(kvTimestamp)) {
-      if (cell.getValueLength() == 0 && !allowEmptyValues) {
+      if (isDelete(cell)) {
         if (clearDeletes) {
+          LOG.info("Clearing delete marker");
           // skip "deleted" cell
           return ReturnCode.NEXT_COL;
         } else {
+          LOG.info("Keeping delete marker");
           // keep the marker but skip any remaining versions
           return ReturnCode.INCLUDE_AND_NEXT_COL;
         }
@@ -127,6 +126,18 @@
     }
   }
 
+  private boolean isDelete(Cell cell) {
+    LOG.info("Cell: " + cell);
+    LOG.info("Tag offset: " + cell.getTagsOffset() + ", length: " + cell.getTagsLength());
+    Tag deleteTag = null;
+    if (cell.getTagsLength() > 0) {
+      deleteTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
+          TxConstants.HBase.CELL_TAG_TYPE_DELETE);
+    }
+    LOG.info("Tag: " + deleteTag);
+    return deleteTag != null;
+  }
+
   @Override
   public byte[] toByteArray() throws IOException {
     return super.toByteArray();
diff --git a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/TransactionAwareHTableTest.java
index 1ae52f7..f9a405a 100644
--- a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/TransactionAwareHTableTest.java
@@ -23,6 +23,8 @@
 import co.cask.tephra.persist.InMemoryTransactionStateStorage;
 import co.cask.tephra.persist.TransactionStateStorage;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -43,6 +45,11 @@
 
 import java.io.IOException;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for TransactionAwareHTables.
  */
@@ -122,7 +129,7 @@
     transactionContext.finish();
 
     byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
-    Assert.assertArrayEquals(TestBytes.value, value);
+    assertArrayEquals(TestBytes.value, value);
   }
 
   /**
@@ -142,7 +149,7 @@
     Result result = transactionAwareHTable.get(new Get(TestBytes.row));
     transactionContext.finish();
     byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
-    Assert.assertArrayEquals(value, null);
+    assertArrayEquals(value, null);
   }
 
   /**
@@ -161,7 +168,7 @@
     Result result = transactionAwareHTable.get(new Get(TestBytes.row));
     transactionContext.finish();
     byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
-    Assert.assertArrayEquals(TestBytes.value, value);
+    assertArrayEquals(TestBytes.value, value);
 
     transactionContext.start();
     Delete delete = new Delete(TestBytes.row);
@@ -192,7 +199,7 @@
     Result result = transactionAwareHTable.get(new Get(TestBytes.row));
     transactionContext.finish();
     byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
-    Assert.assertArrayEquals(TestBytes.value, value);
+    assertArrayEquals(TestBytes.value, value);
 
     transactionContext.start();
     Delete delete = new Delete(TestBytes.row);
@@ -203,7 +210,7 @@
     result = transactionAwareHTable.get(new Get(TestBytes.row));
     transactionContext.finish();
     value = result.getValue(TestBytes.family, TestBytes.qualifier);
-    Assert.assertArrayEquals(TestBytes.value, value);
+    assertArrayEquals(TestBytes.value, value);
   }
 
   /**
@@ -255,10 +262,10 @@
 
     Get get = new Get(TestBytes.row);
     Result row = transactionAwareHTable.get(get);
-    Assert.assertFalse(row.isEmpty());
+    assertFalse(row.isEmpty());
     byte[] col1Value = row.getValue(TestBytes.family, TestBytes.qualifier);
-    Assert.assertNotNull(col1Value);
-    Assert.assertArrayEquals(value, col1Value);
+    assertNotNull(col1Value);
+    assertArrayEquals(value, col1Value);
     // write from in-progress transaction should not be visible
     byte[] col2Value = row.getValue(TestBytes.family, col2);
     Assert.assertNull(col2Value);
@@ -268,7 +275,7 @@
 
     get = new Get(TestBytes.row);
     row = transactionAwareHTable.get(get);
-    Assert.assertFalse(row.isEmpty());
+    assertFalse(row.isEmpty());
     col2Value = row.getValue(TestBytes.family, col2);
     Assert.assertNull(col2Value);
 
@@ -276,4 +283,36 @@
 
     inprogressTxContext2.abort();
   }
+
+  /**
+   * Tests that empty values can be stored, since delete markers are now done using cell tags.
+   */
+  @Test
+  public void testEmptyValues() throws Exception {
+    // test a normal delete
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    transactionAwareHTable.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
+    transactionContext.finish();
+    assertNotNull(result);
+    assertFalse(result.isEmpty());
+    Cell cell = result.getColumnLatestCell(TestBytes.family, TestBytes.qualifier);
+    assertNotNull(cell);
+    assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cell));
+
+    transactionContext.start();
+    transactionAwareHTable.delete(new Delete(TestBytes.row));
+    transactionContext.finish();
+
+    transactionContext.start();
+    result = transactionAwareHTable.get(new Get(TestBytes.row));
+    transactionContext.finish();
+    assertNotNull(result);
+    assertTrue(result.isEmpty());
+  }
 }
diff --git a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
index 6f16654..f86b406 100644
--- a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
@@ -42,6 +42,7 @@
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
@@ -125,6 +126,7 @@
     dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
     dfsCluster.waitActive();
     conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf());
+    conf.set("hfile.format.version", "3");
 
     conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
     conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
@@ -296,6 +298,7 @@
         if (i % 2 == 1) {
           // deletes are performed as puts with empty values
           Put deletePut = new Put(row);
+          deletePut.setAttribute(TxConstants.DELETE_OPERATION_ATTRIBUTE_KEY, new byte[0]);
           deletePut.add(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]);
           region.put(deletePut);
         }
@@ -304,7 +307,7 @@
       // read all back
       scan = new Scan(row);
       scan.setFilter(new TransactionVisibilityFilter(
-          TxUtils.createDummyTransaction(txSnapshot), new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+          TxUtils.createDummyTransaction(txSnapshot), new TreeMap<byte[], Long>(), ScanType.USER_SCAN));
       regionScanner = region.getScanner(scan);
       results = Lists.newArrayList();
       assertFalse(regionScanner.next(results));
@@ -326,7 +329,7 @@
 
       scan = new Scan(row);
       scan.setFilter(new TransactionVisibilityFilter(
-          TxUtils.createDummyTransaction(txSnapshot), new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+          TxUtils.createDummyTransaction(txSnapshot), new TreeMap<byte[], Long>(), ScanType.USER_SCAN));
       regionScanner = region.getScanner(scan);
       results = Lists.newArrayList();
       assertFalse(regionScanner.next(results));
diff --git a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionVisibilityFilterTest.java
index f00dfd1..b0f9a9e 100644
--- a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionVisibilityFilterTest.java
@@ -125,7 +125,7 @@
   }
 
   protected Filter createFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-    return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN);
+    return new TransactionVisibilityFilter(tx, familyTTLs, ScanType.USER_SCAN);
   }
 
   protected KeyValue newKeyValue(String rowkey, String value, long timestamp) {