fixes #785 Added meaningful message to commit exception
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncCommitObserver.java b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncCommitObserver.java
index 6a1e2ed..4e78f60 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncCommitObserver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncCommitObserver.java
@@ -41,6 +41,6 @@
/**
* Called when async commit of a transaction fails because it overlapped with another transaction
*/
- void commitFailed();
+ void commitFailed(String msg);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
index 67f465c..403fb66 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
@@ -112,9 +112,9 @@
}
@Override
- public void commitFailed() {
+ public void commitFailed(String msg) {
try {
- aco.commitFailed();
+ aco.commitFailed(msg);
} finally {
finish(TxResult.COMMIT_EXCEPTION);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/SyncCommitObserver.java b/modules/core/src/main/java/org/apache/fluo/core/async/SyncCommitObserver.java
index 2053c99..314859e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/SyncCommitObserver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/SyncCommitObserver.java
@@ -27,6 +27,7 @@
private volatile boolean committed = false;
private volatile boolean aacked = false;
private volatile Exception error = null;
+ private volatile String commitFailMsg = "";
@Override
public void committed() {
@@ -47,8 +48,9 @@
}
@Override
- public void commitFailed() {
+ public void commitFailed(String msg) {
committed = false;
+ commitFailMsg = msg;
cdl.countDown();
}
@@ -65,7 +67,7 @@
} else if (aacked) {
throw new AlreadyAcknowledgedException();
} else if (!committed) {
- throw new CommitException();
+ throw new CommitException(commitFailMsg);
}
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index be375c6..ad36fd2 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -100,7 +100,7 @@
}
@Override
- public void commitFailed() {
+ public void commitFailed(String msg) {
txi = null;
// retry transaction
executor.submit(this);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 441517a..e2e6d32 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -77,6 +77,7 @@
import org.apache.fluo.core.util.ConditionalFlutation;
import org.apache.fluo.core.util.FluoCondition;
import org.apache.fluo.core.util.Flutation;
+import org.apache.fluo.core.util.Hex;
import org.apache.fluo.core.util.SpanUtil;
/**
@@ -408,6 +409,38 @@
return prow + " " + pcol + " " + pval + " " + rejected.size();
}
+ public String getShortCollisionMessage() {
+ StringBuilder sb = new StringBuilder();
+ if (rejected.size() > 0) {
+ int numCollisions = 0;
+ for (Set<Column> cols : rejected.values()) {
+ numCollisions += cols.size();
+ }
+
+ sb.append("Collsions(");
+ sb.append(numCollisions);
+ sb.append("):");
+
+ String sep = "";
+ outer: for (Entry<Bytes, Set<Column>> entry : rejected.entrySet()) {
+ Bytes row = entry.getKey();
+ for (Column col : entry.getValue()) {
+ sb.append(sep);
+ sep = ", ";
+ Hex.encNonAscii(sb, row);
+ sb.append(" ");
+ Hex.encNonAscii(sb, col, " ");
+ if (sb.length() > 100) {
+ sb.append(" ...");
+ break outer;
+ }
+ }
+ }
+ }
+
+ return sb.toString();
+ }
+
// async stuff
private AsyncConditionalWriter acw;
private AsyncConditionalWriter bacw;
@@ -843,7 +876,7 @@
if (checkForAckCollision(pcm)) {
cd.commitObserver.alreadyAcknowledged();
} else {
- cd.commitObserver.commitFailed();
+ cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
}
return;
}
@@ -959,7 +992,7 @@
Futures.addCallback(future, new CommitCallback<Void>(cd) {
@Override
protected void onSuccess(CommitData cd, Void v) throws Exception {
- cd.commitObserver.commitFailed();
+ cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
}
}, env.getSharedResources().getAsyncCommitExecutor());
}
@@ -1111,7 +1144,7 @@
private void postCommitPrimary(CommitData cd, long commitTs, Status mutationStatus)
throws Exception {
if (mutationStatus != Status.ACCEPTED) {
- cd.commitObserver.commitFailed();
+ cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
} else {
if (stopAfterPrimaryCommit) {
cd.commitObserver.committed();
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index db2fefb..646113f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -266,8 +266,8 @@
}
@Override
- public void commitFailed() {
- aco.commitFailed();
+ public void commitFailed(String msg) {
+ aco.commitFailed(msg);
logUnsuccessfulCommit();
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index 05b67e2..895fc21 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -59,7 +59,7 @@
}
@Override
- public void commitFailed() {
+ public void commitFailed(String msg) {
notificationProcessor.requeueNotification(notificationFinder, notification);
}
}