(TEPHRA-279) Make TransactionContext resilient to exceptions from getTransactionAwareName()
This closes #69 from GitHub.
Signed-off-by: anew <anew@apache.org>
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
index 3c11e96..0c846d6 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -122,13 +122,15 @@
for (TransactionAware txAware : txAwares) {
try {
txAware.startTx(currentTx);
- } catch (Throwable e) {
- String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- txClient.abort(currentTx);
- currentTx = null;
- throw new TransactionFailureException(message, e);
+ } catch (Throwable t) {
+ try {
+ txClient.abort(currentTx);
+ TransactionFailureException tfe = createTransactionFailure("start", txAware, t);
+ LOG.warn(tfe.getMessage());
+ throw tfe;
+ } finally {
+ currentTx = null;
+ }
}
}
}
@@ -147,8 +149,11 @@
checkForConflicts();
persist();
commit();
- postCommit();
- currentTx = null;
+ try {
+ postCommit();
+ } finally {
+ currentTx = null;
+ }
}
/**
@@ -228,23 +233,32 @@
boolean success = true;
for (TransactionAware txAware : txAwares) {
try {
- if (!txAware.rollbackTx()) {
- success = false;
- }
- } catch (Throwable e) {
- String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
+ success = txAware.rollbackTx() && success;
+ } catch (Throwable t) {
+ TransactionFailureException tfe = createTransactionFailure("roll back changes in", txAware, t);
+ LOG.warn(tfe.getMessage());
if (cause == null) {
- cause = new TransactionFailureException(message, e);
+ cause = tfe;
+ } else {
+ cause.addSuppressed(tfe);
}
success = false;
}
}
- if (success) {
- txClient.abort(currentTx);
- } else {
- txClient.invalidate(currentTx.getTransactionId());
+ try {
+ if (success) {
+ txClient.abort(currentTx);
+ } else {
+ txClient.invalidate(currentTx.getTransactionId());
+ }
+ } catch (Throwable t) {
+ if (cause == null) {
+ cause = new TransactionFailureException(
+ String.format("Error while calling transaction service to %s transaction %d.",
+ success ? "abort" : "invalidate", currentTx.getTransactionId()));
+ } else {
+ cause.addSuppressed(t);
+ }
}
if (cause != null) {
throw cause;
@@ -259,12 +273,11 @@
for (TransactionAware txAware : txAwares) {
try {
changes.addAll(txAware.getTxChanges());
- } catch (Throwable e) {
- String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
+ } catch (Throwable t) {
+ TransactionFailureException tfe = createTransactionFailure("retrieve changes from", txAware, t);
+ LOG.warn(tfe.getMessage());
// abort will throw that exception
+ abort(tfe);
}
}
try {
@@ -281,24 +294,18 @@
private void persist() throws TransactionFailureException {
for (TransactionAware txAware : txAwares) {
- boolean success;
+ boolean success = false;
Throwable cause = null;
try {
success = txAware.commitTx();
} catch (Throwable e) {
- success = false;
cause = e;
}
if (!success) {
- String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- if (cause == null) {
- LOG.warn(message);
- } else {
- LOG.warn(message, cause);
- }
- abort(new TransactionFailureException(message, cause));
+ TransactionFailureException tfe = createTransactionFailure("persist changes of", txAware, cause);
+ LOG.warn(tfe.getMessage());
// abort will throw that exception
+ abort(tfe);
}
}
}
@@ -321,15 +328,38 @@
for (TransactionAware txAware : txAwares) {
try {
txAware.postTxCommit();
- } catch (Throwable e) {
- String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- cause = new TransactionFailureException(message, e);
+ } catch (Throwable t) {
+ TransactionFailureException tfe = createTransactionFailure("perform post-commit for", txAware, t);
+ LOG.warn(tfe.getMessage());
+ if (cause == null) {
+ cause = tfe;
+ } else {
+ cause.addSuppressed(tfe);
+ }
}
}
if (cause != null) {
throw cause;
}
}
+
+ private TransactionFailureException createTransactionFailure(String action,
+ TransactionAware txAware,
+ Throwable cause) {
+ String txAwareName;
+ Throwable thrownForName = null;
+ try {
+ txAwareName = txAware.getTransactionAwareName();
+ } catch (Throwable t) {
+ thrownForName = t;
+ txAwareName = "unknown";
+ }
+ TransactionFailureException tfe = new TransactionFailureException(
+ String.format("Unable to %s transaction-aware '%s' for transaction %d",
+ action, txAwareName, currentTx.getTransactionId()), cause);
+ if (thrownForName != null) {
+ tfe.addSuppressed(thrownForName);
+ }
+ return tfe;
+ }
}
diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
index 54e8a8c..3ed2b88 100644
--- a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
+++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
@@ -40,6 +40,7 @@
InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+ InduceFailure failGetName = InduceFailure.NoFailure;
void addChange(byte[] key) {
changes.add(key);
@@ -118,6 +119,9 @@
@Override
public String getTransactionAwareName() {
+ if (failGetName == InduceFailure.ThrowException) {
+ throw new RuntimeException("get name failure");
+ }
return "dummy";
}
}
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index fcf793e..9502ccf 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -501,4 +501,82 @@
Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
}
+
+ @Test
+ public void testGetTxAwareNameFails() throws TransactionFailureException {
+
+ // tests that under any scneario that can make a transaction fail, exceptions from
+ // getTransactionAwareName() do not affect proper abort, and the transaction context's
+ // state is clear (no current transaction) afterwards.
+ TransactionContext context = newTransactionContext(ds1);
+
+ ds1.failGetName = DummyTxAware.InduceFailure.ThrowException;
+ // the txAware will throw exceptions whenever getTransactionAwareName() is called.
+ // This is called in various failure scenarios. Test these scenarios one by one and check that
+ // the tx context is still functional after that.
+
+ // test failure during startTx()
+ ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ try {
+ context.start();
+ Assert.fail("Start should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("start failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ // test failure during getTxChanges()
+ ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ context.start();
+ try {
+ context.finish();
+ Assert.fail("Get changes should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("changes failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ // test failure during commitTx()
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ context.start();
+ try {
+ context.finish();
+ Assert.fail("Persist should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("persist failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ // test failure during rollbackTx()
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ context.start();
+ try {
+ context.abort();
+ Assert.fail("Rollback should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("rollback failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ // test failure during postTxCommit()
+ ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ context.start();
+ try {
+ context.finish();
+ Assert.fail("Post Commit should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("post failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ Assert.assertTrue(context.removeTransactionAware(ds1));
+ context.start();
+ context.finish();
+ Assert.assertNull(context.getCurrentTransaction());
+ }
}