TEPHRA-299 Executing a large batch delete is very slow.
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index 18886c7..347c1fe 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
@@ -316,7 +317,7 @@
if (tx == null) {
throw new IOException("Transaction not started");
}
- hTable.delete(transactionalizeAction(delete));
+ hTable.put(transactionalizeAction(delete));
}
@Override
@@ -324,12 +325,12 @@
if (tx == null) {
throw new IOException("Transaction not started");
}
- List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+ List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
for (Delete delete : deletes) {
- Delete txDelete = transactionalizeAction(delete);
+ Put txDelete = transactionalizeAction(delete);
transactionalizedDeletes.add(txDelete);
}
- hTable.delete(transactionalizedDeletes);
+ hTable.put(transactionalizedDeletes);
}
@Override
@@ -541,11 +542,11 @@
return txPut;
}
- private Delete transactionalizeAction(Delete delete) throws IOException {
+ private Put transactionalizeAction(Delete delete) throws IOException {
long transactionTimestamp = tx.getWritePointer();
byte[] deleteRow = delete.getRow();
- Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+ Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
if (familyToDelete.isEmpty()) {
@@ -556,6 +557,8 @@
// Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
// no need to identify individual columns deleted
+ deleteMarkers.add(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, columnDescriptor.getName(), null);
}
} else {
@@ -565,7 +568,8 @@
for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+ deleteMarkers.add(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
}
}
@@ -583,31 +587,34 @@
if (conflictLevel == TxConstants.ConflictDetection.ROW ||
conflictLevel == TxConstants.ConflictDetection.NONE) {
// no need to identify individual columns deleted
- txDelete.deleteFamily(family);
+ deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, family, null);
} else {
Result result = get(new Get(delete.getRow()).addFamily(family));
// Delete entire family
NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
+ deleteMarkers.add(family, column.getKey(), transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, family, column.getKey());
}
}
} else {
for (Cell value : entries) {
- txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
+ deleteMarkers.add(value.getFamily(), value.getQualifier(), transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
}
}
}
}
for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
- txDelete.setAttribute(entry.getKey(), entry.getValue());
+ deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
}
- txDelete.setDurability(delete.getDurability());
- addToOperation(txDelete, tx);
- return txDelete;
+ deleteMarkers.setDurability(delete.getDurability());
+ addToOperation(deleteMarkers, tx);
+ return deleteMarkers;
}
private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
diff --git a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index e3ef374..981143e 100644
--- a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
@@ -317,7 +318,7 @@
if (tx == null) {
throw new IOException("Transaction not started");
}
- hTable.delete(transactionalizeAction(delete));
+ hTable.put(transactionalizeAction(delete));
}
@Override
@@ -325,12 +326,12 @@
if (tx == null) {
throw new IOException("Transaction not started");
}
- List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+ List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
for (Delete delete : deletes) {
- Delete txDelete = transactionalizeAction(delete);
+ Put txDelete = transactionalizeAction(delete);
transactionalizedDeletes.add(txDelete);
}
- hTable.delete(transactionalizedDeletes);
+ hTable.put(transactionalizedDeletes);
}
@Override
@@ -564,11 +565,11 @@
return txPut;
}
- private Delete transactionalizeAction(Delete delete) throws IOException {
+ private Put transactionalizeAction(Delete delete) throws IOException {
long transactionTimestamp = tx.getWritePointer();
byte[] deleteRow = delete.getRow();
- Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+ Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
if (familyToDelete.isEmpty()) {
@@ -579,6 +580,8 @@
// Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
// no need to identify individual columns deleted
+ deleteMarkers.add(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, columnDescriptor.getName(), null);
}
} else {
@@ -588,7 +591,8 @@
for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+ deleteMarkers.add(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
}
}
@@ -606,31 +610,34 @@
if (conflictLevel == TxConstants.ConflictDetection.ROW ||
conflictLevel == TxConstants.ConflictDetection.NONE) {
// no need to identify individual columns deleted
- txDelete.deleteFamily(family);
+ deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, family, null);
} else {
Result result = get(new Get(delete.getRow()).addFamily(family));
// Delete entire family
NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
+ deleteMarkers.add(family, column.getKey(), transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, family, column.getKey());
}
}
} else {
for (Cell value : entries) {
- txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
+ deleteMarkers.add(value.getFamily(), value.getQualifier(), transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
}
}
}
}
for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
- txDelete.setAttribute(entry.getKey(), entry.getValue());
+ deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
}
- txDelete.setDurability(delete.getDurability());
- addToOperation(txDelete, tx);
- return txDelete;
+ deleteMarkers.setDurability(delete.getDurability());
+ addToOperation(deleteMarkers, tx);
+ return deleteMarkers;
}
private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index b35c8aa..4a99378 100644
--- a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
@@ -293,7 +294,7 @@
if (tx == null) {
throw new IOException("Transaction not started");
}
- hTable.delete(transactionalizeAction(delete));
+ hTable.put(transactionalizeAction(delete));
}
@Override
@@ -301,12 +302,12 @@
if (tx == null) {
throw new IOException("Transaction not started");
}
- List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+ List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
for (Delete delete : deletes) {
- Delete txDelete = transactionalizeAction(delete);
+ Put txDelete = transactionalizeAction(delete);
transactionalizedDeletes.add(txDelete);
}
- hTable.delete(transactionalizedDeletes);
+ hTable.put(transactionalizedDeletes);
}
@Override
@@ -631,11 +632,11 @@
return txPut;
}
- private Delete transactionalizeAction(Delete delete) throws IOException {
+ private Put transactionalizeAction(Delete delete) throws IOException {
long transactionTimestamp = tx.getWritePointer();
byte[] deleteRow = delete.getRow();
- Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+ Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
if (familyToDelete.isEmpty()) {
@@ -648,6 +649,8 @@
// changeSet
for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
// no need to identify individual columns deleted
+ deleteMarkers.addColumn(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, columnDescriptor.getName(), null);
}
} else {
@@ -657,7 +660,8 @@
for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.addColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+ deleteMarkers.addColumn(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
}
}
@@ -675,32 +679,33 @@
if (conflictLevel == TxConstants.ConflictDetection.ROW
|| conflictLevel == TxConstants.ConflictDetection.NONE) {
// no need to identify individual columns deleted
- txDelete.addFamily(family);
+ deleteMarkers.addColumn(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, family, null);
} else {
Result result = get(new Get(delete.getRow()).addFamily(family));
// Delete entire family
NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.addColumns(family, column.getKey(), transactionTimestamp);
+ deleteMarkers.addColumn(family, column.getKey(), transactionTimestamp, HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, family, column.getKey());
}
}
} else {
for (Cell value : entries) {
- txDelete.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value),
- transactionTimestamp);
+ deleteMarkers.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value), transactionTimestamp,
+ HConstants.EMPTY_BYTE_ARRAY);
addToChangeSet(deleteRow, CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value));
}
}
}
}
for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
- txDelete.setAttribute(entry.getKey(), entry.getValue());
+ deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
}
- txDelete.setDurability(delete.getDurability());
- addToOperation(txDelete, tx);
- return txDelete;
+ deleteMarkers.setDurability(delete.getDurability());
+ addToOperation(deleteMarkers, tx);
+ return deleteMarkers;
}
private List<? extends Row> transactionalizeActions(List<? extends Row> actions)