fixes #978 Made async commit comprehensible (#1001)
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 2d9e2cc..ef13bc8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -27,21 +27,20 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.ConditionalWriter.Result;
 import org.apache.accumulo.core.client.ConditionalWriter.Status;
 import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.ColumnUpdate;
@@ -105,6 +104,9 @@
   private static final Bytes RLOCK_VAL =
       Bytes.of("special rlock value 94da84e7796ff3b23b779805d820a33f1997cb8b");
 
+  // added to avoid findbugs false positive
+  private static final Supplier<Void> NULLS = () -> null;
+
   private static boolean isWrite(Bytes val) {
     return val != NTFY_VAL && val != RLOCK_VAL;
   }
@@ -117,7 +119,7 @@
     return val == RLOCK_VAL;
   }
 
-  private static enum TxStatus {
+  private enum TxStatus {
     OPEN, COMMIT_STARTED, COMMITTED, CLOSED
   }
 
@@ -136,10 +138,6 @@
   private TxStatus status = TxStatus.OPEN;
   private boolean commitAttempted = false;
 
-  // for testing
-  private boolean stopAfterPreCommit = false;
-  private boolean stopAfterPrimaryCommit = false;
-
   public TransactionImpl(Environment env, Notification trigger, long startTs) {
     Objects.requireNonNull(env, "environment cannot be null");
     Preconditions.checkArgument(startTs >= 0, "startTs cannot be negative");
@@ -522,12 +520,13 @@
       checkIfOpen();
       status = TxStatus.COMMIT_STARTED;
       commitAttempted = true;
-
-      stopAfterPreCommit = true;
     }
 
     SyncCommitObserver sco = new SyncCommitObserver();
-    beginCommitAsync(cd, sco, primary);
+    cd = setUpBeginCommitAsync(cd, sco, primary);
+    if (cd != null) {
+      beginCommitAsyncTest(cd);
+    }
     try {
       sco.waitForCommit();
     } catch (AlreadyAcknowledgedException e) {
@@ -554,7 +553,7 @@
    *
    * @param cd Commit data
    */
-  private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> locksSeen) throws Exception {
+  private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> locksSeen) {
     // TODO make async
     // TODO need to keep track of ranges read (not ranges passed in, but actual data read... user
     // may not iterate over entire range
@@ -619,14 +618,13 @@
 
     if (rowColsToCheck.size() > 0) {
 
-      long startTime = System.currentTimeMillis();
       long waitTime = SnapshotScanner.INITIAL_WAIT_TIME;
 
       boolean resolved = false;
 
       List<Entry<Key, Value>> openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
 
-      startTime = System.currentTimeMillis();
+      long startTime = System.currentTimeMillis();
 
       while (!resolved) {
         resolved = LockResolver.resolveLocks(env, startTs, stats, openReadLocks, startTime);
@@ -701,23 +699,6 @@
     return false;
   }
 
-  @VisibleForTesting
-  public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
-    stopAfterPrimaryCommit = true;
-
-    SyncCommitObserver sco = new SyncCommitObserver();
-    cd.commitObserver = sco;
-    try {
-      beginSecondCommitPhase(cd, commitStamp);
-      sco.waitForCommit();
-    } catch (CommitException e) {
-      return false;
-    } catch (Exception e) {
-      throw new FluoException(e);
-    }
-    return true;
-  }
-
   public CommitData createCommitData() {
     CommitData cd = new CommitData();
     cd.cw = env.getSharedResources().getConditionalWriter();
@@ -728,9 +709,8 @@
 
   @Override
   public synchronized void commit() throws CommitException {
-    SyncCommitObserver sco = null;
     try {
-      sco = new SyncCommitObserver();
+      SyncCommitObserver sco = new SyncCommitObserver();
       commitAsync(sco);
       sco.waitForCommit();
     } finally {
@@ -807,7 +787,7 @@
 
   // CHECKSTYLE:OFF
   @Override
-  protected void finalize() throws Throwable {
+  protected void finalize() {
     // CHECKSTYLE:ON
     // TODO Log an error if transaction is not closed (See FLUO-486)
     close(false);
@@ -818,34 +798,6 @@
     return startTs;
   }
 
-  /**
-   * Funcitonal interface to provide next step of asynchronous commit on successful completion of
-   * the previous one
-   */
-  private static interface OnSuccessInterface<V> {
-    public void onSuccess(V result) throws Exception;
-  }
-
-  private abstract static class SynchronousCommitTask implements Runnable {
-
-    private CommitData cd;
-
-    SynchronousCommitTask(CommitData cd) {
-      this.cd = cd;
-    }
-
-    protected abstract void runCommitStep(CommitData cd) throws Exception;
-
-    @Override
-    public void run() {
-      try {
-        runCommitStep(cd);
-      } catch (Exception e) {
-        cd.commitObserver.failed(e);
-      }
-    }
-  }
-
   @Override
   public int getSize() {
     // TODO could calculate as items are added/set
@@ -874,25 +826,6 @@
     return size;
   }
 
-  private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
-      OnSuccessInterface<V> onSuccessInterface) {
-    cfuture.handleAsync((result, exception) -> {
-      if (exception != null) {
-        cd.commitObserver.failed(exception);
-        return null;
-      } else {
-        try {
-          onSuccessInterface.onSuccess(result);
-          return null;
-        } catch (Exception e) {
-          cd.commitObserver.failed(e);
-          return null;
-        }
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
-  }
-
-  // TODO exception handling!!!!  How?????
   abstract class CommitStep {
     private CommitStep nextStep;
 
@@ -927,13 +860,13 @@
 
   abstract class ConditionalStep extends CommitStep {
 
-    CommitData cd;
-
     public abstract Collection<ConditionalMutation> createMutations(CommitData cd);
 
-    public abstract Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results);
+    public abstract Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+        throws Exception;
 
-    public abstract boolean processResults(CommitData cd, Iterator<Result> results);
+    public abstract boolean processResults(CommitData cd, Iterator<Result> results)
+        throws Exception;
 
     public AsyncConditionalWriter getACW(CommitData cd) {
       return cd.acw;
@@ -942,7 +875,6 @@
     @Override
     CompletableFuture<Boolean> getMainOp(CommitData cd) {
       // TODO not sure threading is correct
-      // TODO handle unknown
       Executor ace = env.getSharedResources().getAsyncCommitExecutor();
       return getACW(cd).apply(createMutations(cd)).thenCompose(results -> {
         // ugh icky that this is an iterator, forces copy to inspect.. could refactor async CW to
@@ -951,17 +883,32 @@
         Iterators.addAll(resultsList, results);
         boolean containsUknown = false;
         for (Result result : resultsList) {
-          containsUknown |= result.getStatus() == Status.UNKNOWN;
+          try {
+            containsUknown |= result.getStatus() == Status.UNKNOWN;
+          } catch (Exception e) {
+            throw new CompletionException(e);
+          }
         }
-
         if (containsUknown) {
           // process unknown in sync executor
           Executor se = env.getSharedResources().getSyncCommitExecutor();
-          return CompletableFuture.supplyAsync(() -> handleUnknown(cd, resultsList.iterator()), se);
+          return CompletableFuture.supplyAsync(() -> {
+            try {
+              return handleUnknown(cd, resultsList.iterator());
+            } catch (Exception e) {
+              throw new CompletionException(e);
+            }
+          }, se);
         } else {
           return CompletableFuture.completedFuture(resultsList.iterator());
         }
-      }).thenApplyAsync(results -> processResults(cd, results), ace);
+      }).thenApplyAsync(results -> {
+        try {
+          return processResults(cd, results);
+        } catch (Exception e) {
+          throw new CompletionException(e);
+        }
+      }, ace);
     }
 
 
@@ -976,7 +923,8 @@
     }
 
     @Override
-    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results) {
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+        throws Exception {
 
       Result result = Iterators.getOnlyElement(results);
       Status mutationStatus = result.getStatus();
@@ -986,14 +934,20 @@
 
         switch (txInfo.status) {
           case LOCKED:
-            return Collections.singleton(new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer())).iterator();
+            return Collections
+                .singleton(
+                    new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer()))
+                .iterator();
           case ROLLED_BACK:
-            return Collections.singleton(new Result(Status.REJECTED, result.getMutation(), result.getTabletServer())).iterator();
+            return Collections
+                .singleton(
+                    new Result(Status.REJECTED, result.getMutation(), result.getTabletServer()))
+                .iterator();
           case UNKNOWN:
             // TODO async
             Result newResult = cd.cw.write(result.getMutation());
             mutationStatus = newResult.getStatus();
-            if(mutationStatus != Status.UNKNOWN) {
+            if (mutationStatus != Status.UNKNOWN) {
               return Collections.singleton(newResult).iterator();
             }
             // TODO handle case were data other tx has lock
@@ -1006,26 +960,30 @@
         }
       }
 
-      //TODO
+      // TODO
       throw new IllegalStateException();
     }
 
     @Override
-    public boolean processResults(CommitData cd, Iterator<Result> results) {
+    public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
       Result result = Iterators.getOnlyElement(results);
       return result.getStatus() == Status.ACCEPTED;
     }
 
     @Override
     CompletableFuture<Void> getFailureOp(CommitData cd) {
-      //TODO can this be simplified by pushing some code to the superclass?
+      // TODO can this be simplified by pushing some code to the superclass?
       return CompletableFuture.supplyAsync(() -> {
-        ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd));
+        final ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd));
 
         cd.addPrimaryToRejected();
         getStats().setRejected(cd.getRejected());
         // TODO do async
-        checkForOrphanedLocks(cd);
+        try {
+          checkForOrphanedLocks(cd);
+        } catch (Exception e) {
+          throw new CompletionException(e);
+        }
         if (checkForAckCollision(pcm)) {
           cd.commitObserver.alreadyAcknowledged();
         } else {
@@ -1041,6 +999,12 @@
   class LockOtherStep extends ConditionalStep {
 
     @Override
+    public AsyncConditionalWriter getACW(CommitData cd) {
+      return cd.bacw;
+    }
+
+
+    @Override
     public Collection<ConditionalMutation> createMutations(CommitData cd) {
 
       ArrayList<ConditionalMutation> mutations = new ArrayList<>();
@@ -1072,7 +1036,7 @@
     }
 
     @Override
-    public boolean processResults(CommitData cd, Iterator<Result> results) {
+    public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
 
       while (results.hasNext()) {
         Result result = results.next();
@@ -1092,13 +1056,15 @@
     CompletableFuture<Void> getFailureOp(CommitData cd) {
       return CompletableFuture.supplyAsync(() -> {
         getStats().setRejected(cd.getRejected());
-        checkForOrphanedLocks(cd);
+        try {
+          // Does this need to be async?
+          checkForOrphanedLocks(cd);
+        } catch (Exception e) {
+          throw new CompletionException(e);
+        }
         return null;
       }, env.getSharedResources().getSyncCommitExecutor()).thenCompose(v -> rollbackLocks(cd));
-
-
     }
-
   }
 
   abstract class BatchWriterStep extends CommitStep {
@@ -1119,23 +1085,130 @@
 
 
   private CompletableFuture<Void> rollbackLocks(CommitData cd) {
-    // TODO
-    return null;
+    CommitStep firstStep = new RollbackOtherLocks();
+    firstStep.andThen(new RollbackPrimaryLock());
+
+    return firstStep.compose(cd)
+        .thenRun(() -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
+
+  }
+
+
+  class RollbackOtherLocks extends BatchWriterStep {
+
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      // roll back locks
+
+      // TODO let rollback be done lazily? this makes GC more difficult
+
+      Flutation m;
+
+      ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
+      for (Bytes row : cd.acceptedRows) {
+        m = new Flutation(env, row);
+        for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
+          if (isReadLock(entry.getValue())) {
+            m.put(entry.getKey(),
+                ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+                DelReadLockValue.encodeRollback());
+          } else {
+            m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+                DelLockValue.encodeRollback(false, true));
+          }
+        }
+        mutations.add(m);
+      }
+
+      return mutations;
+    }
+  }
+
+  class RollbackPrimaryLock extends BatchWriterStep {
+
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      // mark transaction as complete for garbage collection purposes
+      Flutation m = new Flutation(env, cd.prow);
+
+      m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+          DelLockValue.encodeRollback(startTs, true, true));
+      m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
+
+      return Collections.singletonList(m);
+    }
+  }
+
+  class CommittedTestStep extends CommitStep {
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      cd.commitObserver.committed();
+      return CompletableFuture.completedFuture(true);
+    }
+
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      throw new IllegalStateException("Failure not expected");
+    }
+  }
+
+  @VisibleForTesting
+  public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
+
+    SyncCommitObserver sco = new SyncCommitObserver();
+    cd.commitObserver = sco;
+    try {
+      CommitStep firstStep = new GetCommitStampStepTest(commitStamp);
+
+      firstStep.andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep())
+          .andThen(new CommittedTestStep());
+
+      firstStep.compose(cd).exceptionally(throwable -> {
+        cd.commitObserver.failed(throwable);
+        return null;
+      });
+      sco.waitForCommit();
+    } catch (CommitException e) {
+      return false;
+    } catch (Exception e) {
+      throw new FluoException(e);
+    }
+    return true;
   }
 
   class GetCommitStampStep extends CommitStep {
+    protected CompletableFuture<Stamp> getStampOp() {
+      return env.getSharedResources().getOracleClient().getStampAsync();
+    }
 
     @Override
     CompletableFuture<Boolean> getMainOp(CommitData cd) {
-      // TODO Auto-generated method stub
-      // TODO set commitTs on commit data
-      return null;
+
+      return getStampOp().thenApply(commitStamp -> {
+        if (startTs < commitStamp.getGcTimestamp()) {
+          return false;
+        } else {
+          getStats().setCommitTs(commitStamp.getTxTimestamp());
+          return true;
+        }
+      });
     }
 
     @Override
     CompletableFuture<Void> getFailureOp(CommitData cd) {
-      // TODO Auto-generated method stub
-      return null;
+      return rollbackLocks(cd);
+    }
+
+  }
+
+  class GetCommitStampStepTest extends GetCommitStampStep {
+    private final Stamp testStamp;
+
+    public GetCommitStampStepTest(Stamp testStamp) {
+      this.testStamp = testStamp;
+    }
+
+    @Override
+    protected CompletableFuture<Stamp> getStampOp() {
+      return CompletableFuture.completedFuture(testStamp);
     }
 
   }
@@ -1144,8 +1217,42 @@
 
     @Override
     public Collection<Mutation> createMutations(CommitData cd) {
+      long commitTs = getStats().getCommitTs();
       HashMap<Bytes, Mutation> mutations = new HashMap<>();
-      // TODO copy code from writeNotificationsAsync()
+
+      if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
+        Flutation m = new Flutation(env, cd.prow);
+        Notification.put(env, m, cd.pcol, commitTs);
+        mutations.put(cd.prow, m);
+      }
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
+
+        for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
+          if (observedColumns.contains(colUpdates.getKey())) {
+            Bytes val = colUpdates.getValue();
+            if (isWrite(val) && !isDelete(val)) {
+              Mutation m = mutations.get(rowUpdates.getKey());
+              if (m == null) {
+                m = new Flutation(env, rowUpdates.getKey());
+                mutations.put(rowUpdates.getKey(), m);
+              }
+              Notification.put(env, m, colUpdates.getKey(), commitTs);
+            }
+          }
+        }
+      }
+
+      for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
+        Mutation m = mutations.get(entry.getKey());
+        if (m == null) {
+          m = new Flutation(env, entry.getKey());
+          mutations.put(entry.getKey(), m);
+        }
+        for (Column col : entry.getValue()) {
+          Notification.put(env, m, col, commitTs);
+        }
+      }
       return mutations.values();
     }
 
@@ -1155,36 +1262,107 @@
 
     @Override
     public Collection<ConditionalMutation> createMutations(CommitData cd) {
-      // TODO Auto-generated method stub
-      return null;
+      long commitTs = getStats().getCommitTs();
+      IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
+      PrewriteIterator.setSnaptime(iterConf, startTs);
+      boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
+
+      Condition lockCheck =
+          new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
+              cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
+      final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
+
+      ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
+          isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
+
+      return Collections.singletonList(delLockMutation);
     }
 
     @Override
-    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results) {
-      // TODO Auto-generated method stub
-      return null;
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+        throws Exception {
+      // the code for handing this is synchronous and needs to be handled in another thread pool
+      // TODO - how do we do the above without return a CF?
+      long commitTs = getStats().getCommitTs();
+      Result result = Iterators.getOnlyElement(results);
+      Status ms = result.getStatus();
+
+      while (ms == Status.UNKNOWN) {
+
+        // TODO async
+        TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
+
+        switch (txInfo.status) {
+          case COMMITTED:
+            if (txInfo.commitTs != commitTs) {
+              throw new IllegalStateException(
+                  cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
+            }
+            ms = Status.ACCEPTED;
+            break;
+          case LOCKED:
+            // TODO async
+            ConditionalMutation delLockMutation = result.getMutation();
+            ms = cd.cw.write(delLockMutation).getStatus();
+            break;
+          default:
+            ms = Status.REJECTED;
+        }
+      }
+      Result newResult = new Result(ms, result.getMutation(), result.getTabletServer());
+      return Collections.singletonList(newResult).iterator();
     }
 
     @Override
-    public boolean processResults(CommitData cd, Iterator<Result> results) {
-      // TODO Auto-generated method stub
-      return false;
+    public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
+      Result result = Iterators.getOnlyElement(results);
+      return result.getStatus() == Status.ACCEPTED;
     }
 
     @Override
     CompletableFuture<Void> getFailureOp(CommitData cd) {
-      // TODO Auto-generated method stub
-      return null;
+      cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
+      return CompletableFuture.completedFuture(NULLS.get());
     }
 
   }
 
+  @VisibleForTesting
+  public boolean finishCommit(CommitData cd, Stamp commitStamp) {
+    getStats().setCommitTs(commitStamp.getTxTimestamp());
+
+    CommitStep firstStep = new DeleteLocksStep();
+    firstStep.andThen(new FinishCommitStep());
+    firstStep.compose(cd).exceptionally(throwable -> {
+      System.err.println("Unexpected exception in finish commit test method : ");
+      throwable.printStackTrace();
+      return null;
+    });
+
+    return true;
+  }
+
+
   class DeleteLocksStep extends BatchWriterStep {
 
     @Override
     public Collection<Mutation> createMutations(CommitData cd) {
-      // TODO Auto-generated method stub
-      return null;
+      long commitTs = getStats().getCommitTs();
+      ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
+      for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
+        Flutation m = new Flutation(env, rowUpdates.getKey());
+        boolean isTriggerRow = isTriggerRow(rowUpdates.getKey());
+        for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
+          ColumnUtil.commitColumn(env,
+              isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false,
+              colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
+              isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
+        }
+
+        mutations.add(m);
+      }
+
+      return mutations;
     }
 
   }
@@ -1192,9 +1370,33 @@
   class FinishCommitStep extends BatchWriterStep {
 
     @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      return super.getMainOp(cd).thenApply(b -> {
+        Preconditions.checkArgument(b);
+        cd.commitObserver.committed();
+        return true;
+      });
+    }
+
+    @Override
     public Collection<Mutation> createMutations(CommitData cd) {
-      // TODO Auto-generated method stub
-      return null;
+      long commitTs = getStats().getCommitTs();
+      ArrayList<Mutation> afterFlushMutations = new ArrayList<>(2);
+
+      Flutation m = new Flutation(env, cd.prow);
+      // mark transaction as complete for garbage collection purposes
+      m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY);
+      afterFlushMutations.add(m);
+
+      if (weakNotification != null) {
+        afterFlushMutations.add(weakNotification.newDelete(env, startTs));
+      }
+
+      if (notification != null) {
+        afterFlushMutations.add(notification.newDelete(env, startTs));
+      }
+
+      return afterFlushMutations;
     }
 
   }
@@ -1208,21 +1410,23 @@
 
     try {
       CommitData cd = createCommitData();
-      beginCommitAsync(cd, commitCallback, null);
+      cd = setUpBeginCommitAsync(cd, commitCallback, null);
+      if (cd != null) {
+        beginCommitAsync(cd);
+      }
     } catch (Exception e) {
       e.printStackTrace();
       commitCallback.failed(e);
     }
   }
 
-  private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
+  private CommitData setUpBeginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
       RowColumn primary) {
-
     if (updates.size() == 0) {
       // TODO do async
       deleteWeakRow();
       commitCallback.committed();
-      return;
+      return null;
     }
 
     for (Map<Column, Bytes> cols : updates.values()) {
@@ -1257,7 +1461,7 @@
         // there are only read locks, so nothing to write
         deleteWeakRow();
         commitCallback.committed();
-        return;
+        return null;
       }
     }
 
@@ -1272,360 +1476,48 @@
 
     cd.commitObserver = commitCallback;
 
+    return cd;
+  }
+
+  private void beginCommitAsync(CommitData cd) {
+
+    // Notification are written between GetCommitStampStep and CommitPrimaryStep for the following
+    // reasons :
+    // * At this point all columns are locked, this guarantees that anything triggering as a
+    // result of this transaction will see all of this transactions changes.
+    // * The transaction is not yet committed. If the process dies at this point whatever
+    // was running this transaction should rerun and recreate all of the notifications.
+    // The next transactions will rerun because this transaction will have to be rolled back.
+    // * If notifications are written in the 2nd phase of commit, then when the 2nd phase
+    // partially succeeds notifications may never be written. Because in the case of failure
+    // notifications would not be written until a column is read and it may never be read.
+    // See https://github.com/fluo-io/fluo/issues/642
+    //
+    // Its very important the notifications which trigger an observer are deleted after the 2nd
+    // phase of commit finishes.
+
     CommitStep firstStep = new LockPrimaryStep();
 
-    firstStep.andThen(new LockOtherStep())
-        .andThen(new GetCommitStampStep())
-        .andThen(new WriteNotificationsStep())
-        .andThen(new CommitPrimaryStep())
-        .andThen(new DeleteLocksStep())
-        .andThen(new FinishCommitStep());
+    firstStep.andThen(new LockOtherStep()).andThen(new GetCommitStampStep())
+        .andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep())
+        .andThen(new DeleteLocksStep()).andThen(new FinishCommitStep());
 
-    firstStep.compose(cd);
+    firstStep.compose(cd).exceptionally(throwable -> {
+      cd.commitObserver.failed(throwable);
+      return null;
+    });
   }
 
-  private void postLockPrimary(final CommitData cd, final ConditionalMutation pcm, Result result)
-      throws Exception {
-    final Status mutationStatus = result.getStatus();
+  private void beginCommitAsyncTest(CommitData cd) {
 
-    if (mutationStatus == Status.ACCEPTED) {
-      lockOtherColumns(cd);
-    } else {
-      env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd) {
-        @Override
-        protected void runCommitStep(CommitData cd) throws Exception {
-          synchronousPostLockPrimary(cd, pcm, mutationStatus);
-        }
-      });
-    }
-  }
+    CommitStep firstStep = new LockPrimaryStep();
 
-  private void synchronousPostLockPrimary(CommitData cd, ConditionalMutation pcm,
-      Status mutationStatus) throws AccumuloException, AccumuloSecurityException, Exception {
-    // TODO convert this code to async
-    while (mutationStatus == Status.UNKNOWN) {
-      TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
+    firstStep.andThen(new LockOtherStep()).andThen(new CommittedTestStep());
 
-      switch (txInfo.status) {
-        case LOCKED:
-          mutationStatus = Status.ACCEPTED;
-          break;
-        case ROLLED_BACK:
-          mutationStatus = Status.REJECTED;
-          break;
-        case UNKNOWN:
-          // TODO async
-          mutationStatus = cd.cw.write(pcm).getStatus();
-          // TODO handle case were data other tx has lock
-          break;
-        case COMMITTED:
-        default:
-          throw new IllegalStateException(
-              "unexpected tx state " + txInfo.status + " " + cd.prow + " " + cd.pcol);
-
-      }
-    }
-
-    if (mutationStatus != Status.ACCEPTED) {
-      cd.addPrimaryToRejected();
-      getStats().setRejected(cd.getRejected());
-      // TODO do async
-      checkForOrphanedLocks(cd);
-      if (checkForAckCollision(pcm)) {
-        cd.commitObserver.alreadyAcknowledged();
-      } else {
-        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
-      }
-      return;
-    }
-
-    lockOtherColumns(cd);
-  }
-
-  private void lockOtherColumns(CommitData cd) {
-    ArrayList<ConditionalMutation> mutations = new ArrayList<>();
-
-    for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
-      ConditionalFlutation cm = null;
-
-      for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        if (cm == null) {
-          cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(), cd.prow,
-              cd.pcol, false);
-        } else {
-          prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false);
-        }
-      }
-
-      mutations.add(cm);
-    }
-
-    cd.acceptedRows = new HashSet<>();
-
-    CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations);
-    addCallback(cfuture, cd, results -> postLockOther(cd, results));
-  }
-
-  private void postLockOther(final CommitData cd, Iterator<Result> results) throws Exception {
-    while (results.hasNext()) {
-      Result result = results.next();
-      // TODO handle unknown?
-      Bytes row = Bytes.of(result.getMutation().getRow());
-      if (result.getStatus() == Status.ACCEPTED) {
-        cd.acceptedRows.add(row);
-      } else {
-        cd.addToRejected(row, updates.get(row).keySet());
-      }
-    }
-
-    if (cd.getRejected().size() > 0) {
-      getStats().setRejected(cd.getRejected());
-      env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd) {
-        @Override
-        protected void runCommitStep(CommitData cd) throws Exception {
-          checkForOrphanedLocks(cd);
-          rollbackOtherLocks(cd);
-        }
-      });
-    } else if (stopAfterPreCommit) {
-      cd.commitObserver.committed();
-    } else {
-      CompletableFuture<Stamp> cfuture = env.getSharedResources().getOracleClient().getStampAsync();
-      addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp));
-    }
-  }
-
-  private void rollbackOtherLocks(CommitData cd) throws Exception {
-    // roll back locks
-
-    // TODO let rollback be done lazily? this makes GC more difficult
-
-    Flutation m;
-
-    ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
-    for (Bytes row : cd.acceptedRows) {
-      m = new Flutation(env, row);
-      for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
-        if (isReadLock(entry.getValue())) {
-          m.put(entry.getKey(), ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
-              DelReadLockValue.encodeRollback());
-        } else {
-          m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
-              DelLockValue.encodeRollback(false, true));
-        }
-      }
-      mutations.add(m);
-    }
-
-    CompletableFuture<Void> cfuture =
-        env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd));
-  }
-
-  private void rollbackPrimaryLock(CommitData cd) throws Exception {
-
-    // mark transaction as complete for garbage collection purposes
-    Flutation m = new Flutation(env, cd.prow);
-
-    m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
-        DelLockValue.encodeRollback(startTs, true, true));
-    m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
-
-    CompletableFuture<Void> cfuture =
-        env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
-    addCallback(cfuture, cd,
-        result -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
-  }
-
-  private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception {
-    if (startTs < commitStamp.getGcTimestamp()) {
-      rollbackOtherLocks(cd);
-    } else {
-      // Notification are written here for the following reasons :
-      // * At this point all columns are locked, this guarantees that anything triggering as a
-      // result of this transaction will see all of this transactions changes.
-      // * The transaction is not yet committed. If the process dies at this point whatever
-      // was running this transaction should rerun and recreate all of the notifications.
-      // The next transactions will rerun because this transaction will have to be rolled back.
-      // * If notifications are written in the 2nd phase of commit, then when the 2nd phase
-      // partially succeeds notifications may never be written. Because in the case of failure
-      // notifications would not be written until a column is read and it may never be read.
-      // See https://github.com/fluo-io/fluo/issues/642
-      //
-      // Its very important the notifications which trigger an observer are deleted after the 2nd
-      // phase of commit finishes.
-      getStats().setCommitTs(commitStamp.getTxTimestamp());
-      writeNotificationsAsync(cd, commitStamp.getTxTimestamp());
-    }
-  }
-
-  private void writeNotificationsAsync(CommitData cd, final long commitTs) {
-
-    HashMap<Bytes, Mutation> mutations = new HashMap<>();
-
-    if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
-      Flutation m = new Flutation(env, cd.prow);
-      Notification.put(env, m, cd.pcol, commitTs);
-      mutations.put(cd.prow, m);
-    }
-
-    for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
-
-      for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        if (observedColumns.contains(colUpdates.getKey())) {
-          Bytes val = colUpdates.getValue();
-          if (isWrite(val) && !isDelete(val)) {
-            Mutation m = mutations.get(rowUpdates.getKey());
-            if (m == null) {
-              m = new Flutation(env, rowUpdates.getKey());
-              mutations.put(rowUpdates.getKey(), m);
-            }
-            Notification.put(env, m, colUpdates.getKey(), commitTs);
-          }
-        }
-      }
-    }
-
-    for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
-      Mutation m = mutations.get(entry.getKey());
-      if (m == null) {
-        m = new Flutation(env, entry.getKey());
-        mutations.put(entry.getKey(), m);
-      }
-      for (Column col : entry.getValue()) {
-        Notification.put(env, m, col, commitTs);
-      }
-    }
-
-    CompletableFuture<Void> cfuture =
-        env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
-    addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs));
-  }
-
-  private void commmitPrimary(CommitData cd, final long commitTs) {
-    // try to delete lock and add write for primary column
-    IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
-    PrewriteIterator.setSnaptime(iterConf, startTs);
-    boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
-
-    Condition lockCheck =
-        new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
-            cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
-    final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
-
-    ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
-        isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
-
-    CompletableFuture<Iterator<Result>> cfuture =
-        cd.acw.apply(Collections.singletonList(delLockMutation));
-    addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
-        Iterators.getOnlyElement(result)));
-  }
-
-  private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs,
-      final ConditionalMutation delLockMutation, Result result) throws Exception {
-
-    final Status mutationStatus = result.getStatus();
-    if (mutationStatus == Status.UNKNOWN) {
-      // the code for handing this is synchronous and needs to be handled in another thread pool
-      Runnable task = new SynchronousCommitTask(cd) {
-        @Override
-        protected void runCommitStep(CommitData cd) throws Exception {
-
-          Status ms = mutationStatus;
-
-          while (ms == Status.UNKNOWN) {
-
-            // TODO async
-            TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
-
-            switch (txInfo.status) {
-              case COMMITTED:
-                if (txInfo.commitTs != commitTs) {
-                  throw new IllegalStateException(
-                      cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
-                }
-                ms = Status.ACCEPTED;
-                break;
-              case LOCKED:
-                // TODO async
-                ms = cd.cw.write(delLockMutation).getStatus();
-                break;
-              default:
-                ms = Status.REJECTED;
-            }
-          }
-
-          postCommitPrimary(cd, commitTs, ms);
-        }
-      };
-
-      env.getSharedResources().getSyncCommitExecutor().execute(task);
-    } else {
-      postCommitPrimary(cd, commitTs, mutationStatus);
-    }
-  }
-
-  private void postCommitPrimary(CommitData cd, long commitTs, Status mutationStatus)
-      throws Exception {
-    if (mutationStatus != Status.ACCEPTED) {
-      cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
-    } else {
-      if (stopAfterPrimaryCommit) {
-        cd.commitObserver.committed();
-      } else {
-        deleteLocks(cd, commitTs);
-      }
-    }
-  }
-
-  private void deleteLocks(CommitData cd, final long commitTs) {
-    // delete locks and add writes for other columns
-    ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
-    for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
-      Flutation m = new Flutation(env, rowUpdates.getKey());
-      boolean isTriggerRow = isTriggerRow(rowUpdates.getKey());
-      for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        ColumnUtil.commitColumn(env,
-            isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false,
-            colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
-            isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
-      }
-
-      mutations.add(m);
-    }
-
-    CompletableFuture<Void> cfuture =
-        env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    addCallback(cfuture, cd, result -> finishCommit(cd, commitTs));
-  }
-
-  @VisibleForTesting
-  public boolean finishCommit(CommitData cd, Stamp commitStamp)
-      throws TableNotFoundException, MutationsRejectedException {
-    deleteLocks(cd, commitStamp.getTxTimestamp());
-    return true;
-  }
-
-  private void finishCommit(CommitData cd, long commitTs) {
-    ArrayList<Mutation> afterFlushMutations = new ArrayList<>(2);
-
-    Flutation m = new Flutation(env, cd.prow);
-    // mark transaction as complete for garbage collection purposes
-    m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY);
-    afterFlushMutations.add(m);
-
-    if (weakNotification != null) {
-      afterFlushMutations.add(weakNotification.newDelete(env, startTs));
-    }
-
-    if (notification != null) {
-      afterFlushMutations.add(notification.newDelete(env, startTs));
-    }
-
-    env.getSharedResources().getBatchWriter().writeMutationsAsync(afterFlushMutations);
-
-    cd.commitObserver.committed();
+    firstStep.compose(cd).exceptionally(throwable -> {
+      cd.commitObserver.failed(throwable);
+      return null;
+    });
   }
 
   public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index a175500..02ed35d 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -20,10 +20,7 @@
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
@@ -124,12 +121,11 @@
     tx.close();
   }
 
-  public CommitData createCommitData() throws TableNotFoundException {
+  public CommitData createCommitData() {
     return tx.createCommitData();
   }
 
-  public boolean preCommit(CommitData cd) throws AlreadyAcknowledgedException,
-      TableNotFoundException, AccumuloException, AccumuloSecurityException {
+  public boolean preCommit(CommitData cd) throws AlreadyAcknowledgedException {
     return tx.preCommit(cd);
   }
 
@@ -137,13 +133,11 @@
     return tx.preCommit(cd, primary);
   }
 
-  public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp)
-      throws AccumuloException, AccumuloSecurityException {
+  public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
     return tx.commitPrimaryColumn(cd, commitStamp);
   }
 
-  public void finishCommit(CommitData cd, Stamp commitStamp)
-      throws MutationsRejectedException, TableNotFoundException {
+  public void finishCommit(CommitData cd, Stamp commitStamp) {
     tx.finishCommit(cd, commitStamp);
     env.getSharedResources().getBatchWriter().waitForAsyncFlush();
   }