Support old clients on operation attribute name change
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index d962486..61ee3cc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -88,11 +88,19 @@
*/
public static final String TX_OPERATION_ATTRIBUTE_KEY = "tephra.tx";
/**
+ * @deprecated This constant is replaced by {@link #TX_OPERATION_ATTRIBUTE_KEY}
+ */
+ public static final String OLD_TX_OPERATION_ATTRIBUTE_KEY = "cask.tx";
+ /**
* Key used to flag a delete operation as part of a transaction rollback. This is used so that the
* {@code TransactionProcessor} coprocessor loaded on a table can differentiate between deletes issued
* as part of a normal client operation versus those performed when rolling back a transaction.
*/
public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "tephra.tx.rollback";
+ /**
+ * @deprecated This constant is replaced by {@link #TX_ROLLBACK_ATTRIBUTE_KEY}
+ */
+ public static final String OLD_TX_ROLLBACK_ATTRIBUTE_KEY = "cask.tx.rollback";
/**
* Column qualifier used for a special delete marker tombstone, which identifies an entire column family as deleted.
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java
index f9a3ff3..4633248 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java
@@ -139,7 +139,7 @@
byte[] family = change.getFamily();
byte[] qualifier = change.getQualifier();
Delete rollbackDelete = new Delete(row);
- rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ makeRollbackOperation(rollbackDelete);
switch (conflictLevel) {
case ROW:
case NONE:
@@ -612,4 +612,8 @@
public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
}
+
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
}
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java
index 136dedc..c74f98d 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
- if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+ if (isRollbackOperation(delete)) {
return;
}
@@ -301,12 +301,22 @@
private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+ if (encoded == null) {
+ // to support old clients
+ encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+ }
if (encoded != null) {
return txCodec.decode(encoded);
}
return null;
}
+ private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
+ return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+ // to support old clients
+ op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+ }
+
/**
* Derived classes can override this method to customize the filter used to return data visible for the current
* transaction.
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java
index 6760ed7..1edd4d7 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+ import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1552,4 +1553,65 @@
assertTrue(result.isEmpty());
transactionContext.finish();
}
+
+ /**
+ * Tests that transaction co-processor works with older clients
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testOlderClientOperations() throws Exception {
+ // Use old HTable to test
+ TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+ transactionContext.addTransactionAware(oldTxAware);
+
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ oldTxAware.put(put);
+ transactionContext.finish();
+
+ transactionContext.start();
+ long txId = transactionContext.getCurrentTransaction().getTransactionId();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+ oldTxAware.put(put);
+ // Invalidate the second Put
+ TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+ txClient.invalidate(txId);
+
+ transactionContext.start();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+ oldTxAware.put(put);
+ // Abort the third Put
+ transactionContext.abort();
+
+ // Get should now return the first value
+ transactionContext.start();
+ Result result = oldTxAware.get(new Get(TestBytes.row));
+ transactionContext.finish();
+
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ }
+
+ /**
+ * Represents older transaction clients
+ */
+ private static class OldTransactionAwareHTable extends TransactionAwareHTable {
+ public OldTransactionAwareHTable(HTableInterface hTable) {
+ super(hTable);
+ }
+
+ @Override
+ public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+ op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+ }
+
+ @Override
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
+ }
}
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
index 54babd1..9c04d8f 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
@@ -143,7 +143,7 @@
byte[] family = change.getFamily();
byte[] qualifier = change.getQualifier();
Delete rollbackDelete = new Delete(row);
- rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ makeRollbackOperation(rollbackDelete);
switch (conflictLevel) {
case ROW:
case NONE:
@@ -639,4 +639,8 @@
public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
}
+
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
}
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
index e8e045a..cc1915d 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
- if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+ if (isRollbackOperation(delete)) {
return;
}
@@ -301,12 +301,22 @@
private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+ if (encoded == null) {
+ // to support old clients
+ encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+ }
if (encoded != null) {
return txCodec.decode(encoded);
}
return null;
}
+ private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
+ return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+ // to support old clients
+ op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+ }
+
/**
* Derived classes can override this method to customize the filter used to return data visible for the current
* transaction.
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java
index 908f9c8..c4062df 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1549,4 +1550,65 @@
assertTrue(result.isEmpty());
transactionContext.finish();
}
+
+ /**
+ * Tests that transaction co-processor works with older clients
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testOlderClientOperations() throws Exception {
+ // Use old HTable to test
+ TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+ transactionContext.addTransactionAware(oldTxAware);
+
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ oldTxAware.put(put);
+ transactionContext.finish();
+
+ transactionContext.start();
+ long txId = transactionContext.getCurrentTransaction().getTransactionId();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+ oldTxAware.put(put);
+ // Invalidate the second Put
+ TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+ txClient.invalidate(txId);
+
+ transactionContext.start();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+ oldTxAware.put(put);
+ // Abort the third Put
+ transactionContext.abort();
+
+ // Get should now return the first value
+ transactionContext.start();
+ Result result = oldTxAware.get(new Get(TestBytes.row));
+ transactionContext.finish();
+
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ }
+
+ /**
+ * Represents older transaction clients
+ */
+ private static class OldTransactionAwareHTable extends TransactionAwareHTable {
+ public OldTransactionAwareHTable(HTableInterface hTable) {
+ super(hTable);
+ }
+
+ @Override
+ public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+ op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+ }
+
+ @Override
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
+ }
}
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java
index ea9fdad..62cdafd 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java
@@ -143,7 +143,7 @@
byte[] family = change.getFamily();
byte[] qualifier = change.getQualifier();
Delete rollbackDelete = new Delete(row);
- rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ makeRollbackOperation(rollbackDelete);
switch (conflictLevel) {
case ROW:
case NONE:
@@ -671,4 +671,8 @@
public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
}
+
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
}
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java
index 45ac114..f219373 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
- if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+ if (isRollbackOperation(delete)) {
return;
}
@@ -301,12 +301,22 @@
private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+ if (encoded == null) {
+ // to support old clients
+ encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+ }
if (encoded != null) {
return txCodec.decode(encoded);
}
return null;
}
+ private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
+ return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+ // to support old clients
+ op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+ }
+
/**
* Derived classes can override this method to customize the filter used to return data visible for the current
* transaction.
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java
index d31b972..1360252 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1549,4 +1550,65 @@
assertTrue(result.isEmpty());
transactionContext.finish();
}
+
+ /**
+ * Tests that transaction co-processor works with older clients
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testOlderClientOperations() throws Exception {
+ // Use old HTable to test
+ TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+ transactionContext.addTransactionAware(oldTxAware);
+
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ oldTxAware.put(put);
+ transactionContext.finish();
+
+ transactionContext.start();
+ long txId = transactionContext.getCurrentTransaction().getTransactionId();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+ oldTxAware.put(put);
+ // Invalidate the second Put
+ TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+ txClient.invalidate(txId);
+
+ transactionContext.start();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+ oldTxAware.put(put);
+ // Abort the third Put
+ transactionContext.abort();
+
+ // Get should now return the first value
+ transactionContext.start();
+ Result result = oldTxAware.get(new Get(TestBytes.row));
+ transactionContext.finish();
+
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ }
+
+ /**
+ * Represents older transaction clients
+ */
+ private static class OldTransactionAwareHTable extends TransactionAwareHTable {
+ public OldTransactionAwareHTable(HTableInterface hTable) {
+ super(hTable);
+ }
+
+ @Override
+ public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+ op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+ }
+
+ @Override
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
+ }
}
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java
index 3d0fb20..ed8cc25 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java
@@ -143,7 +143,7 @@
byte[] family = change.getFamily();
byte[] qualifier = change.getQualifier();
Delete rollbackDelete = new Delete(row);
- rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ makeRollbackOperation(rollbackDelete);
switch (conflictLevel) {
case ROW:
case NONE:
@@ -671,4 +671,8 @@
public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
}
+
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
}
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java
index 0d6ef17..1cb6564 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
- if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+ if (isRollbackOperation(delete)) {
return;
}
@@ -301,12 +301,22 @@
private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+ if (encoded == null) {
+ // to support old clients
+ encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+ }
if (encoded != null) {
return txCodec.decode(encoded);
}
return null;
}
+ private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
+ return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+ // to support old clients
+ op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+ }
+
/**
* Derived classes can override this method to customize the filter used to return data visible for the current
* transaction.
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java
index 6ff4bb1..0d54e89 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1549,4 +1550,65 @@
assertTrue(result.isEmpty());
transactionContext.finish();
}
+
+ /**
+ * Tests that transaction co-processor works with older clients
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testOlderClientOperations() throws Exception {
+ // Use old HTable to test
+ TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+ transactionContext.addTransactionAware(oldTxAware);
+
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ oldTxAware.put(put);
+ transactionContext.finish();
+
+ transactionContext.start();
+ long txId = transactionContext.getCurrentTransaction().getTransactionId();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+ oldTxAware.put(put);
+ // Invalidate the second Put
+ TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+ txClient.invalidate(txId);
+
+ transactionContext.start();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+ oldTxAware.put(put);
+ // Abort the third Put
+ transactionContext.abort();
+
+ // Get should now return the first value
+ transactionContext.start();
+ Result result = oldTxAware.get(new Get(TestBytes.row));
+ transactionContext.finish();
+
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ }
+
+ /**
+ * Represents older transaction clients
+ */
+ private static class OldTransactionAwareHTable extends TransactionAwareHTable {
+ public OldTransactionAwareHTable(HTableInterface hTable) {
+ super(hTable);
+ }
+
+ @Override
+ public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+ op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+ }
+
+ @Override
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
+ }
}
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java
index f28930e..4740f6b 100644
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java
@@ -143,7 +143,7 @@
byte[] family = change.getFamily();
byte[] qualifier = change.getQualifier();
Delete rollbackDelete = new Delete(row);
- rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ makeRollbackOperation(rollbackDelete);
switch (conflictLevel) {
case ROW:
case NONE:
@@ -671,4 +671,8 @@
public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
}
+
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
}
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java
index 8051279..27a7ef2 100644
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
- if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+ if (isRollbackOperation(delete)) {
return;
}
@@ -300,12 +300,22 @@
private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+ if (encoded == null) {
+ // to support old clients
+ encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+ }
if (encoded != null) {
return txCodec.decode(encoded);
}
return null;
}
+ private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
+ return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+ // to support old clients
+ op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+ }
+
/**
* Derived classes can override this method to customize the filter used to return data visible for the current
* transaction.
diff --git a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java
index 8578230..9968fb3 100644
--- a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1543,4 +1544,65 @@
assertTrue(result.isEmpty());
transactionContext.finish();
}
+
+ /**
+ * Tests that transaction co-processor works with older clients
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testOlderClientOperations() throws Exception {
+ // Use old HTable to test
+ TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+ transactionContext.addTransactionAware(oldTxAware);
+
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ oldTxAware.put(put);
+ transactionContext.finish();
+
+ transactionContext.start();
+ long txId = transactionContext.getCurrentTransaction().getTransactionId();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+ oldTxAware.put(put);
+ // Invalidate the second Put
+ TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+ txClient.invalidate(txId);
+
+ transactionContext.start();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+ oldTxAware.put(put);
+ // Abort the third Put
+ transactionContext.abort();
+
+ // Get should now return the first value
+ transactionContext.start();
+ Result result = oldTxAware.get(new Get(TestBytes.row));
+ transactionContext.finish();
+
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ }
+
+ /**
+ * Represents older transaction clients
+ */
+ private static class OldTransactionAwareHTable extends TransactionAwareHTable {
+ public OldTransactionAwareHTable(HTableInterface hTable) {
+ super(hTable);
+ }
+
+ @Override
+ public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+ op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+ }
+
+ @Override
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
+ }
}