fixes #785 Added meaningful message to commit exception
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncCommitObserver.java b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncCommitObserver.java
index 6a1e2ed..4e78f60 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncCommitObserver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncCommitObserver.java
@@ -41,6 +41,6 @@
   /**
    * Called when async commit of a transaction fails because it overlapped with another transaction
    */
-  void commitFailed();
+  void commitFailed(String msg);
 
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
index 67f465c..403fb66 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
@@ -112,9 +112,9 @@
     }
 
     @Override
-    public void commitFailed() {
+    public void commitFailed(String msg) {
       try {
-        aco.commitFailed();
+        aco.commitFailed(msg);
       } finally {
         finish(TxResult.COMMIT_EXCEPTION);
       }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/SyncCommitObserver.java b/modules/core/src/main/java/org/apache/fluo/core/async/SyncCommitObserver.java
index 2053c99..314859e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/SyncCommitObserver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/SyncCommitObserver.java
@@ -27,6 +27,7 @@
   private volatile boolean committed = false;
   private volatile boolean aacked = false;
   private volatile Exception error = null;
+  private volatile String commitFailMsg = "";
 
   @Override
   public void committed() {
@@ -47,8 +48,9 @@
   }
 
   @Override
-  public void commitFailed() {
+  public void commitFailed(String msg) {
     committed = false;
+    commitFailMsg = msg;
     cdl.countDown();
   }
 
@@ -65,7 +67,7 @@
     } else if (aacked) {
       throw new AlreadyAcknowledgedException();
     } else if (!committed) {
-      throw new CommitException();
+      throw new CommitException(commitFailMsg);
     }
   }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index be375c6..ad36fd2 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -100,7 +100,7 @@
     }
 
     @Override
-    public void commitFailed() {
+    public void commitFailed(String msg) {
       txi = null;
       // retry transaction
       executor.submit(this);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 441517a..e2e6d32 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -77,6 +77,7 @@
 import org.apache.fluo.core.util.ConditionalFlutation;
 import org.apache.fluo.core.util.FluoCondition;
 import org.apache.fluo.core.util.Flutation;
+import org.apache.fluo.core.util.Hex;
 import org.apache.fluo.core.util.SpanUtil;
 
 /**
@@ -408,6 +409,38 @@
       return prow + " " + pcol + " " + pval + " " + rejected.size();
     }
 
+    public String getShortCollisionMessage() {
+      StringBuilder sb = new StringBuilder();
+      if (rejected.size() > 0) {
+        int numCollisions = 0;
+        for (Set<Column> cols : rejected.values()) {
+          numCollisions += cols.size();
+        }
+
+        sb.append("Collsions(");
+        sb.append(numCollisions);
+        sb.append("):");
+
+        String sep = "";
+        outer: for (Entry<Bytes, Set<Column>> entry : rejected.entrySet()) {
+          Bytes row = entry.getKey();
+          for (Column col : entry.getValue()) {
+            sb.append(sep);
+            sep = ", ";
+            Hex.encNonAscii(sb, row);
+            sb.append(" ");
+            Hex.encNonAscii(sb, col, " ");
+            if (sb.length() > 100) {
+              sb.append(" ...");
+              break outer;
+            }
+          }
+        }
+      }
+
+      return sb.toString();
+    }
+
     // async stuff
     private AsyncConditionalWriter acw;
     private AsyncConditionalWriter bacw;
@@ -843,7 +876,7 @@
       if (checkForAckCollision(pcm)) {
         cd.commitObserver.alreadyAcknowledged();
       } else {
-        cd.commitObserver.commitFailed();
+        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
       }
       return;
     }
@@ -959,7 +992,7 @@
     Futures.addCallback(future, new CommitCallback<Void>(cd) {
       @Override
       protected void onSuccess(CommitData cd, Void v) throws Exception {
-        cd.commitObserver.commitFailed();
+        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
       }
     }, env.getSharedResources().getAsyncCommitExecutor());
   }
@@ -1111,7 +1144,7 @@
   private void postCommitPrimary(CommitData cd, long commitTs, Status mutationStatus)
       throws Exception {
     if (mutationStatus != Status.ACCEPTED) {
-      cd.commitObserver.commitFailed();
+      cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
     } else {
       if (stopAfterPrimaryCommit) {
         cd.commitObserver.committed();
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index db2fefb..646113f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -266,8 +266,8 @@
     }
 
     @Override
-    public void commitFailed() {
-      aco.commitFailed();
+    public void commitFailed(String msg) {
+      aco.commitFailed(msg);
       logUnsuccessfulCommit();
     }
 
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index 05b67e2..895fc21 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -59,7 +59,7 @@
     }
 
     @Override
-    public void commitFailed() {
+    public void commitFailed(String msg) {
       notificationProcessor.requeueNotification(notificationFinder, notification);
     }
   }