TEPHRA-299 Executing a large batch delete is very slow.
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index 18886c7..347c1fe 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
@@ -316,7 +317,7 @@
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    hTable.delete(transactionalizeAction(delete));
+    hTable.put(transactionalizeAction(delete));
   }
 
   @Override
@@ -324,12 +325,12 @@
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+    List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
     for (Delete delete : deletes) {
-      Delete txDelete = transactionalizeAction(delete);
+      Put txDelete = transactionalizeAction(delete);
       transactionalizedDeletes.add(txDelete);
     }
-    hTable.delete(transactionalizedDeletes);
+    hTable.put(transactionalizedDeletes);
   }
 
   @Override
@@ -541,11 +542,11 @@
     return txPut;
   }
 
-  private Delete transactionalizeAction(Delete delete) throws IOException {
+  private Put transactionalizeAction(Delete delete) throws IOException {
     long transactionTimestamp = tx.getWritePointer();
 
     byte[] deleteRow = delete.getRow();
-    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
 
     Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
     if (familyToDelete.isEmpty()) {
@@ -556,6 +557,8 @@
         // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
         for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
           // no need to identify individual columns deleted
+          deleteMarkers.add(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+            HConstants.EMPTY_BYTE_ARRAY);
           addToChangeSet(deleteRow, columnDescriptor.getName(), null);
         }
       } else {
@@ -565,7 +568,8 @@
         for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
           NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
           for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-            txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+            deleteMarkers.add(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
           }
         }
@@ -583,31 +587,34 @@
           if (conflictLevel == TxConstants.ConflictDetection.ROW ||
               conflictLevel == TxConstants.ConflictDetection.NONE) {
             // no need to identify individual columns deleted
-            txDelete.deleteFamily(family);
+            deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, family, null);
           } else {
             Result result = get(new Get(delete.getRow()).addFamily(family));
             // Delete entire family
             NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
             for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-              txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
+              deleteMarkers.add(family, column.getKey(), transactionTimestamp,
+                HConstants.EMPTY_BYTE_ARRAY);
               addToChangeSet(deleteRow, family, column.getKey());
             }
           }
         } else {
           for (Cell value : entries) {
-            txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
+            deleteMarkers.add(value.getFamily(), value.getQualifier(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
           }
         }
       }
     }
     for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
-        txDelete.setAttribute(entry.getKey(), entry.getValue());
+      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
     }
-    txDelete.setDurability(delete.getDurability());
-    addToOperation(txDelete, tx);
-    return txDelete;
+    deleteMarkers.setDurability(delete.getDurability());
+    addToOperation(deleteMarkers, tx);
+    return deleteMarkers;
   }
 
   private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
diff --git a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index e3ef374..981143e 100644
--- a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
@@ -317,7 +318,7 @@
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    hTable.delete(transactionalizeAction(delete));
+    hTable.put(transactionalizeAction(delete));
   }
 
   @Override
@@ -325,12 +326,12 @@
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+    List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
     for (Delete delete : deletes) {
-      Delete txDelete = transactionalizeAction(delete);
+      Put txDelete = transactionalizeAction(delete);
       transactionalizedDeletes.add(txDelete);
     }
-    hTable.delete(transactionalizedDeletes);
+    hTable.put(transactionalizedDeletes);
   }
 
   @Override
@@ -564,11 +565,11 @@
     return txPut;
   }
 
-  private Delete transactionalizeAction(Delete delete) throws IOException {
+  private Put transactionalizeAction(Delete delete) throws IOException {
     long transactionTimestamp = tx.getWritePointer();
 
     byte[] deleteRow = delete.getRow();
-    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
 
     Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
     if (familyToDelete.isEmpty()) {
@@ -579,6 +580,8 @@
         // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
         for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
           // no need to identify individual columns deleted
+          deleteMarkers.add(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+            HConstants.EMPTY_BYTE_ARRAY);
           addToChangeSet(deleteRow, columnDescriptor.getName(), null);
         }
       } else {
@@ -588,7 +591,8 @@
         for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
           NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
           for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-            txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+            deleteMarkers.add(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
           }
         }
@@ -606,31 +610,34 @@
           if (conflictLevel == TxConstants.ConflictDetection.ROW ||
               conflictLevel == TxConstants.ConflictDetection.NONE) {
             // no need to identify individual columns deleted
-            txDelete.deleteFamily(family);
+            deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, family, null);
           } else {
             Result result = get(new Get(delete.getRow()).addFamily(family));
             // Delete entire family
             NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
             for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-              txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
+              deleteMarkers.add(family, column.getKey(), transactionTimestamp,
+                HConstants.EMPTY_BYTE_ARRAY);
               addToChangeSet(deleteRow, family, column.getKey());
             }
           }
         } else {
           for (Cell value : entries) {
-            txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
+            deleteMarkers.add(value.getFamily(), value.getQualifier(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
           }
         }
       }
     }
     for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
-        txDelete.setAttribute(entry.getKey(), entry.getValue());
+      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
     }
-    txDelete.setDurability(delete.getDurability());
-    addToOperation(txDelete, tx);
-    return txDelete;
+    deleteMarkers.setDurability(delete.getDurability());
+    addToOperation(deleteMarkers, tx);
+    return deleteMarkers;
   }
 
   private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index b35c8aa..4a99378 100644
--- a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
@@ -293,7 +294,7 @@
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    hTable.delete(transactionalizeAction(delete));
+    hTable.put(transactionalizeAction(delete));
   }
 
   @Override
@@ -301,12 +302,12 @@
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+    List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
     for (Delete delete : deletes) {
-      Delete txDelete = transactionalizeAction(delete);
+      Put txDelete = transactionalizeAction(delete);
       transactionalizedDeletes.add(txDelete);
     }
-    hTable.delete(transactionalizedDeletes);
+    hTable.put(transactionalizedDeletes);
   }
 
   @Override
@@ -631,11 +632,11 @@
     return txPut;
   }
 
-  private Delete transactionalizeAction(Delete delete) throws IOException {
+  private Put transactionalizeAction(Delete delete) throws IOException {
     long transactionTimestamp = tx.getWritePointer();
 
     byte[] deleteRow = delete.getRow();
-    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
 
     Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
     if (familyToDelete.isEmpty()) {
@@ -648,6 +649,8 @@
         // changeSet
         for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
           // no need to identify individual columns deleted
+          deleteMarkers.addColumn(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+            HConstants.EMPTY_BYTE_ARRAY);
           addToChangeSet(deleteRow, columnDescriptor.getName(), null);
         }
       } else {
@@ -657,7 +660,8 @@
         for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
           NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
           for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-            txDelete.addColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+            deleteMarkers.addColumn(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
           }
         }
@@ -675,32 +679,33 @@
           if (conflictLevel == TxConstants.ConflictDetection.ROW
               || conflictLevel == TxConstants.ConflictDetection.NONE) {
             // no need to identify individual columns deleted
-            txDelete.addFamily(family);
+            deleteMarkers.addColumn(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, family, null);
           } else {
             Result result = get(new Get(delete.getRow()).addFamily(family));
             // Delete entire family
             NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
             for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-              txDelete.addColumns(family, column.getKey(), transactionTimestamp);
+              deleteMarkers.addColumn(family, column.getKey(), transactionTimestamp, HConstants.EMPTY_BYTE_ARRAY);
               addToChangeSet(deleteRow, family, column.getKey());
             }
           }
         } else {
           for (Cell value : entries) {
-            txDelete.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value),
-              transactionTimestamp);
+            deleteMarkers.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value));
           }
         }
       }
     }
     for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
-      txDelete.setAttribute(entry.getKey(), entry.getValue());
+      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
     }
-    txDelete.setDurability(delete.getDurability());
-    addToOperation(txDelete, tx);
-    return txDelete;
+    deleteMarkers.setDurability(delete.getDurability());
+    addToOperation(deleteMarkers, tx);
+    return deleteMarkers;
   }
 
   private List<? extends Row> transactionalizeActions(List<? extends Row> actions)