Accord support for live migration

Patch by Ariel Weisberg; Reviewed by Blake Eggleston for CASSANDRA-18129
diff --git a/accord-core/build.gradle b/accord-core/build.gradle
index 46b2930..877bac6 100644
--- a/accord-core/build.gradle
+++ b/accord-core/build.gradle
@@ -52,6 +52,7 @@
         exclude group: 'org.slf4j', module: 'log4j-over-slf4j'
         exclude group: 'org.slf4j', module: 'jcl-over-slf4j'
     }
+    testImplementation group: 'org.awaitility', name: 'awaitility', version: '4.2.0'
 }
 
 task burn(type: JavaExec) {
diff --git a/accord-core/src/main/java/accord/api/Agent.java b/accord-core/src/main/java/accord/api/Agent.java
index ff8ab04..4a4460d 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -18,6 +18,8 @@
 
 package accord.api;
 
+import javax.annotation.Nonnull;
+
 import accord.local.Command;
 import accord.local.Node;
 import accord.primitives.Ranges;
@@ -52,6 +54,16 @@
 
     void onFailedBootstrap(String phase, Ranges ranges, Runnable retry, Throwable failure);
 
+    /**
+     * Invoked with the keys (but not ranges) that have all dependent transactions in the applied
+     * state at this node as of some TxnId. No guarantees are made about other nodes.
+     *
+     * Useful for migrations to/from Accord where you want to know there are no in flight
+     * transactions in Accord that might still execute, and that it is safe to read
+     * outside of Accord.
+     */
+    default void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull Timestamp executeAt) {}
+
     @Override
     void onUncaughtException(Throwable t);
 
diff --git a/accord-core/src/main/java/accord/api/BarrierType.java b/accord-core/src/main/java/accord/api/BarrierType.java
new file mode 100644
index 0000000..864a41c
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/BarrierType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.api;
+
+public enum BarrierType
+{
+    // Only wait until the barrier is achieved locally, and possibly don't trigger the barrier remotely.
+    // Local barriers are only on the `minEpoch` provided and have utility limited to establishing
+    // no more transactions will occur in an earlier before minEpoch
+    local(false, true),
+    // Wait until the barrier has been achieved at a quorum globally
+    global_sync(true, false),
+    // Trigger the global barrier, but only block on creation of the barrier and local application
+    global_async(true, true);
+
+    public final boolean global;
+    public final boolean async;
+
+    BarrierType(boolean global, boolean async)
+    {
+        this.global = global;
+        this.async = async;
+    }
+}
diff --git a/accord-core/src/main/java/accord/api/Query.java b/accord-core/src/main/java/accord/api/Query.java
index af495e6..f215d69 100644
--- a/accord-core/src/main/java/accord/api/Query.java
+++ b/accord-core/src/main/java/accord/api/Query.java
@@ -18,11 +18,13 @@
 
 package accord.api;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 
-import javax.annotation.Nullable;
-
 /**
  * The computational/transformation part of a client query
  */
@@ -31,6 +33,9 @@
     /**
      * Perform some transformation on the complete {@link Data} result of a {@link Read}
      * from some {@link DataStore}, to produce a {@link Result} to return to the client.
+     *
+     * executeAt timestamp is provided so that the result of the transaction can be determined based
+     * on configuration at that epoch. This will be deterministic even if the transaction is recovered.
      */
-    Result compute(TxnId txnId, Timestamp executeAt, @Nullable Data data, @Nullable Read read, @Nullable Update update);
+    Result compute(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Seekables<?, ?> keys, @Nullable Data data, @Nullable Read read, @Nullable Update update);
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Barrier.java b/accord-core/src/main/java/accord/coordinate/Barrier.java
new file mode 100644
index 0000000..39e56de
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Barrier.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nonnull;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.BarrierType;
+import accord.api.Key;
+import accord.api.RoutingKey;
+import accord.local.Command;
+import accord.local.Node;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SafeCommandStore.TestDep;
+import accord.local.SafeCommandStore.TestKind;
+import accord.local.SafeCommandStore.TestTimestamp;
+import accord.local.Status;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.MapReduceConsume;
+import accord.utils.async.AsyncResults;
+
+import static accord.local.PreLoadContext.contextFor;
+import static accord.utils.Invariants.checkArgument;
+import static accord.utils.Invariants.checkState;
+import static accord.utils.async.AsyncChains.getUninterruptibly;
+
+/**
+ * Local or global barriers that return a result once all transactions have their side effects visible.
+ *
+ * For local barriers the epoch is the only guarantee, but for global barriers a new transaction is created so it will
+ * be as of the timestamp of the created transaction.
+ *
+ * Note that reads might still order after, but side effect bearing transactions should not.
+ */
+public class Barrier<S extends Seekables<?, ?>> extends AsyncResults.AbstractResult<Timestamp>
+{
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(Barrier.class);
+
+    private final Node node;
+
+    private final S seekables;
+
+    private final long minEpoch;
+    private final BarrierType barrierType;
+
+    @VisibleForTesting
+    CoordinateSyncPoint<S> coordinateSyncPoint;
+    @VisibleForTesting
+    ExistingTransactionCheck existingTransactionCheck;
+
+    Barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType)
+    {
+        checkArgument(keysOrRanges.domain() == Domain.Key || barrierType.global, "Ranges are only supported with global barriers");
+        checkArgument(keysOrRanges.size() == 1 || barrierType.global, "Only a single key is supported with local barriers");
+        this.node = node;
+        this.minEpoch = minEpoch;
+        this.seekables = keysOrRanges;
+        this.barrierType = barrierType;
+    }
+
+    public static <S extends Seekables<?, ?>> Barrier<S> barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType)
+    {
+        Barrier<S> barrier = new Barrier(node, keysOrRanges, minEpoch, barrierType);
+        barrier.start();
+        return barrier;
+    }
+
+    private void start()
+    {
+        // It may be possible to use local state to determine that the barrier is already satisfied or
+        // there is an existing transaction we can wait on
+        if (!barrierType.global)
+        {
+            existingTransactionCheck = checkForExistingTransaction();
+            existingTransactionCheck.addCallback((barrierTxn, existingTransactionCheckFailure) -> {
+                if (existingTransactionCheckFailure != null)
+                {
+                    Barrier.this.tryFailure(existingTransactionCheckFailure);
+                    return;
+                }
+
+                if (barrierTxn != null)
+                {
+                    if (barrierTxn.status.equals(Status.Applied))
+                    {
+                        doBarrierSuccess(barrierTxn.executeAt);
+                    }
+                    // A listener was added to the transaction already
+                }
+                else
+                {
+                    createSyncPoint();
+                }
+            });
+        }
+        else
+        {
+            createSyncPoint();
+        }
+    }
+
+    private void doBarrierSuccess(Timestamp executeAt)
+    {
+        // The transaction we wait on might have more keys, but it's not
+        // guaranteed they were wanted and we don't want to force the agent to filter them
+        // so provide the seekables we were given.
+        // We also don't notify the agent for range barriers since that makes sense as a local concept
+        // since ranges can easily span multiple nodes
+        if (seekables != null)
+            node.agent().onLocalBarrier(seekables, executeAt);
+        Barrier.this.trySuccess(executeAt);
+    }
+
+    private void createSyncPoint()
+    {
+        try
+        {
+            coordinateSyncPoint = getUninterruptibly(CoordinateSyncPoint.inclusive(node, seekables, barrierType.async));
+        }
+        catch (ExecutionException e)
+        {
+            tryFailure(e);
+        }
+        coordinateSyncPoint.addCallback((syncPoint, syncPointFailure) -> {
+            if (syncPointFailure != null)
+            {
+                Barrier.this.tryFailure(syncPointFailure);
+                return;
+            }
+
+            // Need to wait for the local transaction to finish since coordinate sync point won't wait on anything
+            // if async was requested or there were no deps found
+            if (syncPoint.finishedAsync)
+            {
+                TxnId txnId = syncPoint.syncId;
+                long epoch = txnId.epoch();
+                RoutingKey homeKey = syncPoint.homeKey;
+                node.commandStores().ifLocal(contextFor(txnId), homeKey, epoch, epoch,
+                                             safeStore -> safeStore.get(txnId, homeKey).addAndInvokeListener(safeStore, new BarrierCommandListener()))
+                    .begin(node.agent());
+            }
+            else
+            {
+                doBarrierSuccess(syncPoint.syncId);
+            }
+        });
+    }
+
+    private class BarrierCommandListener implements Command.TransientListener
+    {
+        @Override
+        public void onChange(SafeCommandStore safeStore, SafeCommand safeCommand)
+        {
+            Command command = safeCommand.current();
+            if (command.is(Status.Applied))
+            {
+                Timestamp executeAt = command.executeAt();
+                // In all the cases where we add a listener (listening to existing command, async completion of CoordinateSyncPoint)
+                // we want to notify the agent
+                doBarrierSuccess(executeAt);
+            }
+            else if (command.hasBeen(Status.Truncated))
+                // Surface the invalidation/truncation and the barrier can be retried by the caller
+                Barrier.this.tryFailure(new Timeout(command.txnId(), command.homeKey()));
+
+        }
+
+        @Override
+        public PreLoadContext listenerPreLoadContext(TxnId caller)
+        {
+            return PreLoadContext.contextFor(caller);
+        }
+    }
+
+    private ExistingTransactionCheck checkForExistingTransaction()
+    {
+        checkState(seekables.size() == 1 && seekables.domain() == Domain.Key);
+        ExistingTransactionCheck check = new ExistingTransactionCheck();
+        Key k = seekables.get(0).asKey();
+        node.commandStores().mapReduceConsume(
+                contextFor(k),
+                k.toUnseekable(),
+                minEpoch,
+                Long.MAX_VALUE,
+                check);
+        return check;
+    }
+
+    // Hold result of looking for a transaction to act as a barrier for an Epoch
+    static class BarrierTxn
+    {
+        @Nonnull
+        public final TxnId txnId;
+        @Nonnull
+        public final Timestamp executeAt;
+        @Nonnull
+        public final Status status;
+        @Nonnull
+        public final Key key;
+        public BarrierTxn(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Status status, Key key)
+        {
+            this.txnId = txnId;
+            this.executeAt = executeAt;
+            this.status = status;
+            this.key = key;
+        }
+
+        public BarrierTxn max(BarrierTxn other)
+        {
+            if (other == null)
+                return this;
+            return status.compareTo(other.status) >= 0 ? this : other;
+        }
+    }
+
+    /*
+     * Check for an existing transaction that is either already Applied (dependencies were applied)
+     * or PreApplied (outcome fixed, dependencies not yet applied).
+     *
+     * For Applied we can return success immediately with the executeAt epoch. For PreApplied we can add
+     * a listener for when it transitions to Applied and then return success.
+     */
+    class ExistingTransactionCheck extends AsyncResults.AbstractResult<BarrierTxn> implements MapReduceConsume<SafeCommandStore, BarrierTxn>
+    {
+        @Override
+        public BarrierTxn apply(SafeCommandStore safeStore)
+        {
+            BarrierTxn found = safeStore.mapReduceWithTerminate(
+                    seekables,
+                    safeStore.ranges().allAfter(minEpoch),
+                    // Barriers are trying to establish that committed transactions are applied before the barrier (or in this case just minEpoch)
+                    // so all existing transaction types should ensure that at this point. An earlier txnid may have an executeAt that is after
+                    // this barrier or the transaction we listen on and that is fine
+                    TestKind.Any,
+                    TestTimestamp.EXECUTES_AFTER,
+                    TxnId.minForEpoch(minEpoch),
+                    TestDep.ANY_DEPS,
+                    null,
+                    Status.Committed,
+                    Status.Applied,
+                    (keyOrRange, txnId, executeAt, status, barrierTxn) -> {
+                        if (keyOrRange.domain() == Domain.Key)
+                            return new BarrierTxn(txnId, executeAt, status, keyOrRange.asKey());
+                        return null;
+                    },
+                    null,
+                    // Take the first one we find, and call it good enough to wait on
+                    barrierTxn -> barrierTxn != null);
+            // It's not applied so add a listener to find out when it is applied
+            if (found != null && !found.status.equals(Status.Applied))
+            {
+                safeStore.commandStore().execute(
+                        contextFor(found.txnId),
+                        safeStoreWithTxn -> safeStoreWithTxn.get(found.txnId, found.key.toUnseekable()).addAndInvokeListener(safeStore, new BarrierCommandListener())
+                ).begin(node.agent());
+            }
+            return found;
+        }
+
+        @Override
+        public BarrierTxn reduce(BarrierTxn o1, BarrierTxn o2)
+        {
+            throw new IllegalStateException("Should not be possible to find multiple transactions");
+        }
+
+        @Override
+        public void accept(BarrierTxn result, Throwable failure)
+        {
+            if (failure != null)
+            {
+                ExistingTransactionCheck.this.tryFailure(failure);
+                return;
+            }
+
+            // Will need to create a transaction
+            ExistingTransactionCheck.this.trySuccess(result);
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java b/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java
new file mode 100644
index 0000000..f8b030c
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.function.BiConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Result;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.coordinate.tracking.RequestStatus;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Callback;
+import accord.messages.Commit;
+import accord.messages.ReadData.ReadNack;
+import accord.messages.ReadData.ReadReply;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.RequestStatus.Failed;
+
+/**
+ * Block on deps at quorum for a sync point transaction, and then move the transaction to the applied state
+ */
+public class BlockOnDeps implements Callback<ReadReply>
+{
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(BlockOnDeps.class);
+
+    final Node node;
+    final TxnId txnId;
+    final Txn txn;
+    final FullRoute<?> route;
+    final Deps deps;
+    final Topologies blockOn;
+    final QuorumTracker tracker;
+    final BiConsumer<? super Result, Throwable> callback;
+
+    private boolean isDone = false;
+
+    private BlockOnDeps(Node node, TxnId txnId, Txn txn, FullRoute<?> route,  Deps deps, BiConsumer<? super Result, Throwable> callback)
+    {
+        this.node = node;
+        this.txnId = txnId;
+        this.txn = txn;
+        this.route = route;
+        this.deps = deps;
+        // Sync points don't propose anything so they can execute at their txnId epoch
+        this.blockOn = node.topology().forEpoch(route, txnId.epoch());
+        this.tracker = new QuorumTracker(blockOn);
+        this.callback = callback;
+    }
+
+    public static void blockOnDeps(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, BiConsumer<? super Result, Throwable> callback)
+    {
+        BlockOnDeps blockOnDeps = new BlockOnDeps(node, txnId, txn, route, deps, callback);
+        blockOnDeps.start();
+    }
+
+    void start()
+    {
+        Commit.commitMaximalAndBlockOnDeps(node, blockOn, txnId, txn, route, deps, this);
+    }
+
+    @Override
+    public void onSuccess(Id from, ReadReply reply)
+    {
+        if (isDone)
+            return;
+
+        if (reply.isOk())
+        {
+            if (tracker.recordSuccess(from) == RequestStatus.Success)
+            {
+                isDone = true;
+                callback.accept(txn.result(txnId, txnId, null), null);
+            }
+            return;
+        }
+
+        ReadNack nack = (ReadNack) reply;
+        switch (nack)
+        {
+            default: throw new IllegalStateException();
+            case Redundant:
+                // WaitUntilApplied only sends Redundant on truncation which implies durable and applied
+                isDone = true;
+                callback.accept(txn.result(txnId, txnId, null), null);
+                break;
+            case NotCommitted:
+                throw new IllegalStateException("Received `NotCommitted` response after sending maximal commit as part of `BlockOnDeps`");
+            case Invalid:
+                onFailure(from, new IllegalStateException("Submitted a read command to a replica that did not own the range"));
+                break;
+        }
+    }
+
+    @Override
+    public void onFailure(Id from, Throwable failure)
+    {
+        if (tracker.recordFailure(from) == Failed)
+        {
+            isDone = true;
+            callback.accept(null, failure);
+        }
+    }
+
+    @Override
+    public void onCallbackFailure(Id from, Throwable failure)
+    {
+        isDone = true;
+        callback.accept(null, failure);
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index 1b5b39b..6632f82 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -25,6 +25,7 @@
 import accord.messages.ReadData;
 import accord.messages.SetShardDurable;
 import accord.messages.WaitUntilApplied;
+import accord.primitives.Ranges;
 import accord.primitives.SyncPoint;
 import accord.topology.Topologies;
 import accord.utils.async.AsyncResult;
@@ -37,18 +38,18 @@
 {
     final Node node;
     final AppliedTracker tracker;
-    final SyncPoint exclusiveSyncPoint;
+    final SyncPoint<Ranges> exclusiveSyncPoint;
 
-    private CoordinateShardDurable(Node node, SyncPoint exclusiveSyncPoint)
+    private CoordinateShardDurable(Node node, SyncPoint<Ranges> exclusiveSyncPoint)
     {
         // TODO (required): this isn't correct, we need to potentially perform a second round if a dependency executes in a future epoch and we have lost ownership of that epoch
-        Topologies topologies = node.topology().forEpoch(exclusiveSyncPoint.ranges, exclusiveSyncPoint.sourceEpoch());
+        Topologies topologies = node.topology().forEpoch(exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.sourceEpoch());
         this.node = node;
         this.tracker = new AppliedTracker(topologies);
         this.exclusiveSyncPoint = exclusiveSyncPoint;
     }
 
-    public static AsyncResult<Void> coordinate(Node node, SyncPoint exclusiveSyncPoint)
+    public static AsyncResult<Void> coordinate(Node node, SyncPoint<Ranges> exclusiveSyncPoint)
     {
         CoordinateShardDurable coordinate = new CoordinateShardDurable(node, exclusiveSyncPoint);
         coordinate.start();
@@ -57,7 +58,7 @@
 
     private void start()
     {
-        node.send(tracker.nodes(), to -> new WaitUntilApplied(to, tracker.topologies(), exclusiveSyncPoint.syncId, exclusiveSyncPoint.ranges, exclusiveSyncPoint.syncId), this);
+        node.send(tracker.nodes(), to -> new WaitUntilApplied(to, tracker.topologies(), exclusiveSyncPoint.syncId, exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.syncId), this);
     }
 
     @Override
@@ -87,7 +88,7 @@
             // TODO (required): we also need to handle ranges not being safe to read
             if (tracker.recordSuccess(from) == RequestStatus.Success)
             {
-                node.configService().reportEpochRedundant(exclusiveSyncPoint.ranges, exclusiveSyncPoint.syncId.epoch());
+                node.configService().reportEpochRedundant(exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.syncId.epoch());
                 node.send(tracker.nodes(), new SetShardDurable(exclusiveSyncPoint));
                 trySuccess(null);
             }
diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index 6713694..82cf850 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -19,28 +19,32 @@
 package accord.coordinate;
 
 import java.util.List;
+import java.util.function.BiConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import accord.local.Node;
 import accord.messages.Apply;
 import accord.messages.PreAccept.PreAcceptOk;
 import accord.primitives.Ballot;
 import accord.primitives.Deps;
-import accord.primitives.FullRangeRoute;
 import accord.primitives.FullRoute;
-import accord.primitives.Ranges;
+import accord.primitives.Seekables;
 import accord.primitives.SyncPoint;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.Txn.Kind;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
+import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
-import accord.utils.Invariants;
 
 import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
 import static accord.primitives.Timestamp.mergeMax;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 import static accord.utils.Functions.foldl;
+import static accord.utils.Invariants.checkArgument;
 
 /**
  * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
@@ -48,45 +52,80 @@
  *
  * TODO (desired, testing): dedicated burn test to validate outcomes
  */
-public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+public class CoordinateSyncPoint<S extends Seekables<?, ?>> extends CoordinatePreAccept<SyncPoint<S>>
 {
-    final Ranges ranges;
-    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Ranges ranges)
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(CoordinateSyncPoint.class);
+
+    final S keysOrRanges;
+    // Whether to wait on the dependencies applying globally before returning a result
+    final boolean async;
+
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route, S keysOrRanges, boolean async)
     {
         super(node, txnId, txn, route, node.topology().withOpenEpochs(route, txnId, txnId));
-        this.ranges = ranges;
+        checkArgument(txnId.rw() == Kind.SyncPoint || async, "Exclusive sync points only support async application");
+        this.keysOrRanges = keysOrRanges;
+        this.async = async;
     }
 
-    public static AsyncResult<SyncPoint> exclusive(Node node, Ranges keysOrRanges)
+    public static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> exclusive(Node node, S keysOrRanges)
     {
-        return coordinate(node, ExclusiveSyncPoint, keysOrRanges);
+        return coordinate(node, ExclusiveSyncPoint, keysOrRanges, true);
     }
 
-    public static AsyncResult<SyncPoint> inclusive(Node node, Ranges keysOrRanges)
+    public static <S extends Seekables<?, ?>> CoordinateSyncPoint<S> exclusive(Node node, TxnId txnId, S keysOrRanges)
     {
-        return coordinate(node, Kind.SyncPoint, keysOrRanges);
+        return coordinate(node, txnId, keysOrRanges, true);
     }
 
-    private static AsyncResult<SyncPoint> coordinate(Node node, Kind kind, Ranges ranges)
+    public static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> inclusive(Node node, S keysOrRanges, boolean async)
     {
-        TxnId txnId = node.nextTxnId(kind, ranges.domain());
-        return node.withEpoch(txnId.epoch(), () -> {
-            FullRangeRoute route = (FullRangeRoute) node.computeRoute(txnId, ranges);
-            CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, ranges), route, ranges);
-            coordinate.start();
-            return coordinate;
-        }).beginAsResult();
+        return coordinate(node, Kind.SyncPoint, keysOrRanges, async);
     }
 
-    public static AsyncResult<SyncPoint> coordinate(Node node, TxnId txnId, Ranges ranges)
+    private static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> coordinate(Node node, Kind kind, S keysOrRanges, boolean async)
     {
-        Invariants.checkState(txnId.rw() == Kind.SyncPoint || txnId.rw() == ExclusiveSyncPoint);
-        FullRangeRoute route = (FullRangeRoute) node.computeRoute(txnId, ranges);
-        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(txnId.rw(), ranges), route, ranges);
+        checkArgument(kind == Kind.SyncPoint || kind == ExclusiveSyncPoint);
+        node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain());
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        return node.withEpoch(txnId.epoch(), () ->
+                AsyncChains.success(coordinate(node, txnId, keysOrRanges, async))
+        ).beginAsResult();
+    }
+
+    private static <S extends Seekables<?, ?>> CoordinateSyncPoint<S> coordinate(Node node, TxnId txnId, S keysOrRanges, boolean async)
+    {
+        checkArgument(txnId.rw() == Kind.SyncPoint || txnId.rw() == ExclusiveSyncPoint);
+        FullRoute route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint<S> coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(txnId.rw(), keysOrRanges), route, keysOrRanges, async);
         coordinate.start();
         return coordinate;
     }
 
+    static <S extends Seekables<?, ?>> void blockOnDeps(Node node, Txn txn, TxnId txnId, FullRoute<?> route, S keysOrRanges, Deps deps, BiConsumer<SyncPoint<S>, Throwable> callback, boolean async)
+    {
+        // If deps are empty there is nothing to wait on application for so we can return immediately
+        boolean processAsyncCompletion = deps.isEmpty() || async;
+        BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, (result, throwable) -> {
+            // Don't want to process completion twice
+            if (processAsyncCompletion)
+            {
+                // Don't lose the error
+                if (throwable != null)
+                    node.agent().onUncaughtException(throwable);
+                return;
+            }
+            if (throwable != null)
+                callback.accept(null, throwable);
+            else
+                callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, route, false), null);
+        });
+        // Notify immediately and the caller can add a listener to command completion to track local application
+        if (processAsyncCompletion)
+            callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, route, true), null);
+    }
+
     @Override
     void onNewEpoch(Topologies topologies, Timestamp executeAt, List<PreAcceptOk> successes)
     {
@@ -112,16 +151,10 @@
             executeAt = txnId;
             // we don't need to fetch deps from Accept replies, so we don't need to contact unsynced epochs
             topologies = node.topology().forEpoch(route, txnId.epoch());
-            new Propose<SyncPoint>(node, topologies, Ballot.ZERO, txnId, txn, route, executeAt, deps, this)
-            {
-                @Override
-                void onAccepted()
-                {
-                    Apply.sendMaximal(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null));
-                    node.configService().reportEpochClosed(ranges, txnId.epoch());
-                    accept(new SyncPoint(txnId, deps, ranges, (FullRangeRoute) route), null);
-                }
-            }.start();
+            if (tracker.hasFastPathAccepted() && txnId.rw() == Kind.SyncPoint)
+                blockOnDeps(node, txn, txnId, route, keysOrRanges, deps, this, async);
+            else
+                ProposeSyncPoint.proposeSyncPoint(node, topologies, Ballot.ZERO, txnId, txn, route, deps, executeAt, this, async, tracker.nodes(), keysOrRanges);
         }
     }
 
@@ -129,7 +162,7 @@
     {
         TxnId txnId = syncPoint.syncId;
         Timestamp executeAt = txnId;
-        Txn txn = node.agent().emptyTxn(txnId.rw(), syncPoint.ranges);
+        Txn txn = node.agent().emptyTxn(txnId.rw(), syncPoint.keysOrRanges);
         Deps deps = syncPoint.waitFor;
         Apply.sendMaximal(node, to, txnId, syncPoint.route(), txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null));
     }
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index 7a64395..edfd033 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -36,6 +36,7 @@
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
+import accord.primitives.Txn.Kind;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.topology.Topology;
@@ -43,6 +44,7 @@
 import static accord.coordinate.ReadCoordinator.Action.Approve;
 import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
 import static accord.messages.Commit.Kind.Maximal;
+import static accord.utils.Invariants.checkArgument;
 
 class Execute extends ReadCoordinator<ReadReply>
 {
@@ -69,7 +71,14 @@
 
     public static void execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback)
     {
-        if (txn.read().keys().isEmpty())
+        // Recovery calls execute and we would like execute to run BlockOnDeps because that will notify the agent
+        // of the local barrier
+        if (txn.kind() == Kind.SyncPoint)
+        {
+            checkArgument(txnId.equals(executeAt));
+            BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, callback);
+        }
+        else if (txn.read().keys().isEmpty())
         {
             Result result = txn.result(txnId, executeAt, null);
             Persist.persist(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), result);
@@ -105,7 +114,7 @@
     {
         if (reply.isOk())
         {
-            ReadOk ok = (ReadOk) reply;
+            ReadOk ok = ((ReadOk) reply);
             Data next = ok.data;
             if (next != null)
                 data = data == null ? next : data.merge(next);
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java
index b72408e..0d27f75 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -62,10 +62,10 @@
     private final Map<Id, AcceptReply> debug = debug() ? new HashMap<>() : null;
     final Timestamp executeAt;
     final QuorumTracker acceptTracker;
-    final BiConsumer<? super R, Throwable> callback;
+    final BiConsumer<R, Throwable> callback;
     private boolean isDone;
 
-    Propose(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super R, Throwable> callback)
+    Propose(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<R, Throwable> callback)
     {
         this.node = node;
         this.ballot = ballot;
diff --git a/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java
new file mode 100644
index 0000000..66db85e
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Apply;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRangeRoute;
+import accord.primitives.FullRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+
+public class ProposeSyncPoint<S extends Seekables<?, ?>> extends Propose<SyncPoint<S>>
+{
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(ProposeSyncPoint.class);
+
+    // Whether to wait on the dependencies applying globally before returning a result
+    final boolean async;
+    final Set<Id> fastPathNodes;
+
+    final S keysOrRanges;
+
+    ProposeSyncPoint(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, Timestamp executeAt, BiConsumer<SyncPoint<S>, Throwable> callback, boolean async, Set<Id> fastPathNodes, S keysOrRanges)
+    {
+        super(node, topologies, ballot, txnId, txn, route, executeAt, deps, callback);
+        this.async = async;
+        this.fastPathNodes = fastPathNodes;
+        this.keysOrRanges = keysOrRanges;
+    }
+
+    public static <S extends Seekables<?, ?>> Propose<SyncPoint<S>> proposeSyncPoint(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, Timestamp executeAt, BiConsumer<SyncPoint<S>, Throwable> callback, boolean async, Set<Id> fastPathNodes, S keysOrRanges)
+    {
+        ProposeSyncPoint proposeSyncPoint = new ProposeSyncPoint(node, topologies, ballot, txnId, txn, route, deps, executeAt, callback, async, fastPathNodes, keysOrRanges);
+        proposeSyncPoint.start();
+        return proposeSyncPoint;
+    }
+
+    @Override
+    void onAccepted()
+    {
+        if (txnId.rw() == ExclusiveSyncPoint)
+        {
+            Apply.sendMaximal(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null));
+            node.configService().reportEpochClosed((Ranges)keysOrRanges, txnId.epoch());
+            callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, (FullRangeRoute) route, true), null);
+        }
+        else
+        {
+            CoordinateSyncPoint.blockOnDeps(node, txn, txnId, route, keysOrRanges, deps, callback, async);
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index aaeeb86..0f03927 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -22,7 +22,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +34,9 @@
 import accord.local.Node;
 import accord.messages.Callback;
 import accord.messages.MessageType;
-import accord.messages.ReadData;
+import accord.messages.ReadData.ReadNack;
+import accord.messages.ReadData.ReadOk;
+import accord.messages.ReadData.ReadReply;
 import accord.messages.WaitAndReadData;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
@@ -47,6 +48,7 @@
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
+import javax.annotation.Nullable;
 
 import static accord.messages.ReadData.ReadNack.NotCommitted;
 import static accord.primitives.Routables.Slice.Minimal;
@@ -133,10 +135,10 @@
         Ranges ownedRanges = ownedRangesForNode(to);
         Invariants.checkArgument(ownedRanges.containsAll(ranges), "Got a reply from %s for ranges %s, but owned ranges %s does not contain all the ranges", to, ranges, ownedRanges);
         PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
-        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges), collectMaxApplied()), new Callback<ReadData.ReadReply>()
+        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges), collectMaxApplied()), new Callback<ReadReply>()
         {
             @Override
-            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+            public void onSuccess(Node.Id from, ReadReply reply)
             {
                 if (!reply.isOk())
                 {
@@ -148,7 +150,7 @@
                     {
                         fail(to, new RuntimeException(reply.toString()));
                         inflight.remove(key).cancel();
-                        switch ((ReadData.ReadNack) reply)
+                        switch ((ReadNack) reply)
                         {
                             default: throw new AssertionError("Unhandled enum: " + reply);
                             case Invalid:
@@ -285,7 +287,7 @@
         }
     }
 
-    public static class FetchResponse extends ReadData.ReadOk
+    public static class FetchResponse extends ReadOk
     {
         public final @Nullable Timestamp maxApplied;
         public FetchResponse(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Timestamp maxApplied)
diff --git a/accord-core/src/main/java/accord/impl/CommandTimeseries.java b/accord-core/src/main/java/accord/impl/CommandTimeseries.java
index 4cd34f5..66d26ee 100644
--- a/accord-core/src/main/java/accord/impl/CommandTimeseries.java
+++ b/accord-core/src/main/java/accord/impl/CommandTimeseries.java
@@ -26,6 +26,7 @@
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
+import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableSortedMap;
 
 import accord.api.Key;
@@ -121,6 +122,16 @@
                            @Nullable Status minStatus, @Nullable Status maxStatus,
                            SafeCommandStore.CommandFunction<T, T> map, T initialValue, T terminalValue)
     {
+        return mapReduceWithTerminate(testKind, testTimestamp, timestamp,
+                         testDep, depId,
+                         minStatus, maxStatus,
+                         map, initialValue, Predicates.equalTo(terminalValue));
+    }
+    public <T> T mapReduceWithTerminate(SafeCommandStore.TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+                           SafeCommandStore.TestDep testDep, @Nullable TxnId depId,
+                           @Nullable Status minStatus, @Nullable Status maxStatus,
+                           SafeCommandStore.CommandFunction<T, T> map, T initialValue, Predicate<T> terminatePredicate)
+    {
 
         for (D data : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values())
         {
@@ -136,8 +147,8 @@
             if (testDep != ANY_DEPS && (!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != (testDep == WITH))))
                 continue;
             Timestamp executeAt = loader.executeAt(data);
-            initialValue = map.apply(keyOrRange, txnId, executeAt, initialValue);
-            if (initialValue.equals(terminalValue))
+            initialValue = map.apply(keyOrRange, txnId, executeAt, status.status, initialValue);
+            if (terminatePredicate.test(initialValue))
                 break;
         }
         return initialValue;
diff --git a/accord-core/src/main/java/accord/impl/CommandsForKey.java b/accord-core/src/main/java/accord/impl/CommandsForKey.java
index 48cbc7c..ed238af 100644
--- a/accord-core/src/main/java/accord/impl/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/CommandsForKey.java
@@ -20,13 +20,19 @@
 
 import accord.api.Key;
 import accord.impl.CommandTimeseries.CommandLoader;
-import accord.local.*;
-import accord.primitives.*;
 import com.google.common.collect.ImmutableSortedMap;
 
-import java.util.*;
+import java.util.Objects;
 import java.util.function.Consumer;
 
+import accord.local.Command;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
 import static accord.local.Status.PreAccepted;
 import static accord.local.Status.PreCommitted;
 
diff --git a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java b/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java
index 35b9ea7..e4db999 100644
--- a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java
+++ b/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java
@@ -224,7 +224,7 @@
     {
         TxnId at = node.nextTxnId(ExclusiveSyncPoint, Domain.Range);
         node.scheduler().once(() -> node.withEpoch(at.epoch(), () -> {
-                           CoordinateSyncPoint.coordinate(node, at, ranges)
+                           CoordinateSyncPoint.exclusive(node, at, ranges)
                                .addCallback((success, fail) -> {
                                    if (fail != null) logger.trace("Exception coordinating exclusive sync point for local shard durability of {}", ranges, fail);
                                    else coordinateShardDurableAfterExclusiveSyncPoint(node, success);
diff --git a/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java b/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java
index 5aea436..e9eea9b 100644
--- a/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java
+++ b/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java
@@ -18,10 +18,9 @@
 
 package accord.impl;
 
-import java.util.Collection;
-import java.util.Collections;
-
 import accord.local.Command;
+import accord.local.Command.TransientListener;
+import accord.local.Listeners;
 import accord.local.SafeCommand;
 import accord.local.SaveStatus;
 import accord.primitives.TxnId;
@@ -70,9 +69,9 @@
     }
 
     @Override
-    public Collection<Command.TransientListener> transientListeners()
+    public Listeners<TransientListener> transientListeners()
     {
-        return Collections.emptyList();
+        return Listeners.EMPTY;
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index ff9ebdc..fe47f27 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -20,13 +20,13 @@
 
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
@@ -38,6 +38,8 @@
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,9 +75,9 @@
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
-import javax.annotation.Nullable;
 
 import static accord.local.Command.NotDefined.uninitialised;
+
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestDep.WITH;
 import static accord.local.Status.Committed;
@@ -194,7 +196,7 @@
         return attrs;
     }
 
-    private <O> O mapReduceForKey(InMemorySafeStore safeStore, Routables<?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+    private <O> O mapReduceForKey(InMemorySafeStore safeStore, Routables<?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, Predicate<O> terminate)
     {
         switch (keysOrRanges.domain()) {
             default:
@@ -208,7 +210,7 @@
                     if (forKey.current() == null)
                         continue;
                     accumulate = map.apply(forKey.current(), accumulate);
-                    if (accumulate.equals(terminalValue))
+                    if (terminate.test(accumulate))
                         return accumulate;
                 }
                 break;
@@ -222,7 +224,7 @@
                         if (forKey.value() == null)
                             continue;
                         accumulate = map.apply(forKey.value(), accumulate);
-                        if (accumulate.equals(terminalValue))
+                        if (terminate.test(accumulate))
                             return accumulate;
                     }
                 }
@@ -532,9 +534,9 @@
             return super.value(value);
         }
 
-        public Collection<Command.TransientListener> transientListeners()
+        public Listeners<Command.TransientListener> transientListeners()
         {
-            return transientListeners == null ? Collections.emptySet() : transientListeners;
+            return transientListeners == null ? Listeners.EMPTY : transientListeners;
         }
     }
 
@@ -553,6 +555,18 @@
         }
     }
 
+    private static class TimestampAndStatus
+    {
+        public final Timestamp timestamp;
+        public final Status status;
+
+        public TimestampAndStatus(Timestamp timestamp, Status status)
+        {
+            this.timestamp = timestamp;
+            this.status = status;
+        }
+    }
+
     public static class InMemorySafeStore extends AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeCommandsForKey>
     {
         private final InMemoryCommandStore commandStore;
@@ -640,7 +654,7 @@
         @Override
         public Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
         {
-            Timestamp timestamp = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> Timestamp.nonNullOrMax(forKey.max(), prev), Timestamp.NONE, null);
+            Timestamp timestamp = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> Timestamp.nonNullOrMax(forKey.max(), prev), Timestamp.NONE, Objects::isNull);
             Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
             for (RangeCommand command : commandStore.rangeCommands.values())
             {
@@ -701,6 +715,12 @@
         @Override
         public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
         {
+            return mapReduceWithTerminate(keysOrRanges, slice, testKind, testTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, accumulate, Predicate.isEqual(terminalValue));
+        }
+
+        @Override
+        public <T> T mapReduceWithTerminate(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, Predicate<T> terminate)
+        {
             accumulate = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> {
                 CommandTimeseries<?> timeseries;
                 switch (testTimestamp)
@@ -726,15 +746,15 @@
                     case MAY_EXECUTE_BEFORE:
                         remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
                 }
-                return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
-            }, accumulate, terminalValue);
+                return timeseries.mapReduceWithTerminate(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminate);
+            }, accumulate, terminate);
 
-            if (accumulate.equals(terminalValue))
+            if (terminate.test(accumulate))
                 return accumulate;
 
             // TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable
             Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
-            Map<Range, List<Map.Entry<TxnId, Timestamp>>> collect = new TreeMap<>(Range::compare);
+            Map<Range, List<Map.Entry<TxnId, TimestampAndStatus>>> collect = new TreeMap<>(Range::compare);
             commandStore.rangeCommands.forEach(((txnId, rangeCommand) -> {
                 Command command = rangeCommand.command.value();
                 // TODO (now): probably this isn't safe - want to ensure we take dependency on any relevant syncId
@@ -783,9 +803,9 @@
 
                 Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> {
                     // TODO (easy, efficiency): pass command as a parameter to Fold
-                    List<Map.Entry<TxnId, Timestamp>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
+                    List<Map.Entry<TxnId, TimestampAndStatus>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
                     if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(command.txnId()))
-                        list.add(new AbstractMap.SimpleImmutableEntry<>(command.txnId(), command.executeAt()));
+                        list.add(new AbstractMap.SimpleImmutableEntry<>(command.txnId(), new TimestampAndStatus(command.executeAt(), command.status())));
                     return in;
                 }, collect);
             }));
@@ -814,20 +834,20 @@
 
                     Routables.foldl(ranges, sliced, (r, in, i) -> {
                         // TODO (easy, efficiency): pass command as a parameter to Fold
-                        List<Map.Entry<TxnId, Timestamp>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
+                        List<Map.Entry<TxnId, TimestampAndStatus>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
                         if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(txnId))
-                            list.add(new AbstractMap.SimpleImmutableEntry<>(txnId, txnId));
+                            list.add(new AbstractMap.SimpleImmutableEntry<>(txnId, new TimestampAndStatus(txnId, Status.NotDefined)));
                         return in;
                     }, collect);
                 }));
             }
 
-            for (Map.Entry<Range, List<Map.Entry<TxnId, Timestamp>>> e : collect.entrySet())
+            for (Map.Entry<Range, List<Map.Entry<TxnId, TimestampAndStatus>>> e : collect.entrySet())
             {
-                for (Map.Entry<TxnId, Timestamp> command : e.getValue())
+                for (Map.Entry<TxnId, TimestampAndStatus> command : e.getValue())
                 {
                     T initial = accumulate;
-                    accumulate = map.apply(e.getKey(), command.getKey(), command.getValue(), initial);
+                    accumulate = map.apply(e.getKey(), command.getKey(), command.getValue().timestamp, command.getValue().status, initial);
                 }
             }
 
diff --git a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
index e8de7d8..9c4c2bd 100644
--- a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
+++ b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
@@ -18,11 +18,11 @@
 
 package accord.impl;
 
-import java.util.Collection;
 import java.util.function.Supplier;
 
 import accord.impl.InMemoryCommandStore.GlobalCommand;
 import accord.local.Command;
+import accord.local.Listeners;
 import accord.local.SafeCommand;
 import accord.primitives.TxnId;
 
@@ -72,7 +72,7 @@
     }
 
     @Override
-    public Collection<Command.TransientListener> transientListeners()
+    public Listeners<Command.TransientListener> transientListeners()
     {
         return global.transientListeners();
     }
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index 420d54d..ae39592 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -25,6 +25,10 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import accord.api.ProgressLog;
 import accord.coordinate.FetchData;
@@ -37,6 +41,7 @@
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 import accord.local.SaveStatus;
+
 import accord.local.SaveStatus.LocalExecution;
 import accord.local.Status;
 import accord.local.Status.Known;
@@ -51,7 +56,6 @@
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncResult;
-import javax.annotation.Nullable;
 
 import static accord.api.ProgressLog.ProgressShard.Unsure;
 import static accord.coordinate.InformHomeOfTxn.inform;
@@ -74,8 +78,12 @@
 // TODO (expected): report long-lived recurring transactions / operations
 public class SimpleProgressLog implements ProgressLog.Factory
 {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleProgressLog.class);
+
     enum Progress { NoneExpected, Expected, NoProgress, Investigating, Done }
 
+    public static volatile boolean PAUSE_FOR_TEST = false;
+
     enum CoordinateStatus
     {
         NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done;
@@ -690,6 +698,12 @@
             isScheduled = false;
             try
             {
+                if (PAUSE_FOR_TEST)
+                {
+                    logger.info("Skipping progress log because it is paused for test");
+                    return;
+                }
+
                 for (State.Monitoring run : this)
                 {
                     if (run.shouldRun())
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java
index 638e5ea..41ce28e 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -23,13 +23,11 @@
 import java.util.Set;
 import java.util.function.BiConsumer;
 
-import javax.annotation.Nullable;
-
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.DataStore.FetchRanges;
 import accord.api.DataStore.FetchResult;
 import accord.api.DataStore.StartingRangeFetch;
-import accord.api.DataStore.FetchRanges;
 import accord.coordinate.CoordinateNoOp;
 import accord.coordinate.CoordinateSyncPoint;
 import accord.primitives.Ranges;
@@ -40,6 +38,7 @@
 import accord.utils.Invariants;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
+import javax.annotation.Nullable;
 
 import static accord.local.PreLoadContext.contextFor;
 import static accord.local.PreLoadContext.empty;
@@ -133,7 +132,7 @@
             // of these ranges as part of this attempt
             Ranges commitRanges = valid;
             store.markBootstrapping(safeStore0, globalSyncId, valid);
-            CoordinateSyncPoint.coordinate(node, globalSyncId, commitRanges)
+            CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
                // TODO (correcness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges!
                // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed
                .flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys()), safeStore1 -> {
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index 18d0d60..49ebb5c 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -363,7 +363,7 @@
     @Override
     public abstract Status.Durability durability();
     @Override
-    public abstract Listeners.Immutable durableListeners();
+    public abstract Listeners.Immutable<DurableAndIdempotentListener> durableListeners();
     public abstract SaveStatus saveStatus();
 
     static boolean isSameClass(Command command, Class<? extends Command> klass)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java
index 9a0f846..ae45381 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -40,7 +40,6 @@
 import accord.api.ProgressLog;
 import accord.coordinate.CollectDeps;
 import accord.local.Command.WaitingOn;
-import accord.local.CommandStores.RangesForEpoch;
 import accord.primitives.FullRoute;
 import accord.primitives.KeyDeps;
 import accord.primitives.Participants;
@@ -57,6 +56,7 @@
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
+import accord.local.CommandStores.RangesForEpoch;
 
 import static accord.api.ConfigurationService.EpochReady.DONE;
 import static accord.local.PreLoadContext.contextFor;
@@ -205,7 +205,7 @@
                                    SafeCommandStore.TestTimestamp.STARTED_AFTER, Timestamp.NONE,
                                    SafeCommandStore.TestDep.ANY_DEPS, null,
                                    Status.Applied, Status.Applied,
-                                   (key, txnId, executeAt, max) -> Timestamp.max(max, executeAt),
+                                   (key, txnId, executeAt, status, max) -> Timestamp.max(max, executeAt),
                                    Timestamp.NONE, Timestamp.MAX);
     }
 
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java
index 357f8cf..a1308b0 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -56,6 +56,9 @@
 import org.agrona.collections.Hashing;
 import org.agrona.collections.Int2ObjectHashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static accord.api.ConfigurationService.EpochReady.done;
 import static accord.local.PreLoadContext.empty;
 import static accord.primitives.Routables.Slice.Minimal;
@@ -67,6 +70,9 @@
  */
 public abstract class CommandStores
 {
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(CommandStores.class);
+
     public interface Factory
     {
         CommandStores create(NodeTimeService time,
@@ -215,6 +221,11 @@
             return allInternal(0, ceilIndex(toExclusive));
         }
 
+        public @Nonnull Ranges allAfter(long fromInclusive)
+        {
+            return allInternal(floorIndex(fromInclusive), ranges.length);
+        }
+
         public @Nonnull Ranges allUntil(long toInclusive)
         {
             return allInternal(0, 1 + floorIndex(toInclusive));
@@ -456,6 +467,12 @@
 
                 return null;
             }
+
+            @Override
+            public String toString()
+            {
+                return forEach.getClass().getName();
+            }
         });
     }
 
diff --git a/accord-core/src/main/java/accord/local/Listeners.java b/accord-core/src/main/java/accord/local/Listeners.java
index 84849af..4afd6b4 100644
--- a/accord-core/src/main/java/accord/local/Listeners.java
+++ b/accord-core/src/main/java/accord/local/Listeners.java
@@ -25,6 +25,8 @@
 
 public class Listeners<L extends Command.Listener> extends DeterministicSet<L>
 {
+    public static Listeners EMPTY = new Listeners<>();
+
     public Listeners()
     {
     }
@@ -34,7 +36,7 @@
         super(copy);
     }
 
-    public static class Immutable extends Listeners<Command.DurableAndIdempotentListener>
+    public static class Immutable<L extends Command.Listener> extends Listeners<L>
     {
         public static final Immutable EMPTY = new Immutable();
 
@@ -43,18 +45,18 @@
             super();
         }
 
-        public Immutable(Listeners<Command.DurableAndIdempotentListener> listeners)
+        public Immutable(Listeners<L> listeners)
         {
             super(listeners);
         }
 
-        Listeners<Command.DurableAndIdempotentListener> mutable()
+        Listeners<L> mutable()
         {
             return new Listeners<>(this);
         }
 
         @Override
-        public boolean add(Command.DurableAndIdempotentListener item)
+        public boolean add(L item)
         {
             throw new UnsupportedOperationException("Cannot modify immutable set");
         }
@@ -72,7 +74,7 @@
         }
 
         @Override
-        public boolean addAll(Collection<? extends Command.DurableAndIdempotentListener> c)
+        public boolean addAll(Collection<? extends L> c)
         {
             throw new UnsupportedOperationException("Cannot modify immutable set");
         }
@@ -84,7 +86,7 @@
         }
 
         @Override
-        public boolean removeIf(Predicate<? super Command.DurableAndIdempotentListener> filter)
+        public boolean removeIf(Predicate<? super L> filter)
         {
             throw new UnsupportedOperationException("Cannot modify immutable set");
         }
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index f0ee2bd..6492ddf 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -40,6 +40,8 @@
 
 import accord.api.Agent;
 import accord.api.ConfigurationService;
+
+import accord.api.BarrierType;
 import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
 import accord.api.Key;
@@ -66,6 +68,7 @@
 import accord.primitives.ProgressToken;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
+import accord.coordinate.Barrier;
 import accord.primitives.Routable.Domain;
 import accord.primitives.Routables;
 import accord.primitives.Route;
@@ -524,6 +527,26 @@
         return new TxnId(uniqueNow(), rw, domain);
     }
 
+    /**
+     * Trigger one of several different kinds of barrier transactions on a key or range with different properties. Barriers ensure that all prior transactions
+     * have their side effects visible up to some point.
+     *
+     * Local barriers will look for a local transaction that was applied in minEpoch or later and returns when one exists or completes.
+     * It may, but it is not guaranteed to, trigger a global barrier transaction that effects the barrier at all replicas.
+     *
+     * A global barrier is guaranteed to create a distributed barrier transaction, and if it is synchronous will not return until the
+     * transaction has applied at a quorum globally (meaning all dependencies and their side effects are already visible). If it is asynchronous
+     * it will return once the barrier has been applied locally.
+     *
+     * Ranges are only supported for global barriers.
+     *
+     * Returns the Timestamp the barrier actually ended up occurring at. Keep in mind for local barriers it doesn't mean a new transaction was created.
+     */
+    public AsyncResult<Timestamp> barrier(Seekables keysOrRanges, long minEpoch, BarrierType barrierType)
+    {
+        return Barrier.barrier(this, keysOrRanges, minEpoch, barrierType);
+    }
+
     public AsyncResult<Result> coordinate(Txn txn)
     {
         return coordinate(nextTxnId(txn.kind(), txn.keys().domain()), txn);
@@ -652,7 +675,12 @@
         return future;
     }
 
-    public void receive(Request request, Id from, ReplyContext replyContext)
+    public void receive (Request request, Id from, ReplyContext replyContext)
+    {
+        receive(request, from, replyContext, 0);
+    }
+
+    public void receive (Request request, Id from, ReplyContext replyContext, long delayNanos)
     {
         long knownEpoch = request.knownEpoch();
         if (knownEpoch > topology.epoch())
@@ -665,7 +693,7 @@
                 return;
             }
         }
-        scheduler.now(() -> {
+        Runnable processMsg = () -> {
             try
             {
                 request.process(this, from, replyContext);
@@ -674,7 +702,11 @@
             {
                 reply(from, replyContext, null, t);
             }
-        });
+        };
+        if (delayNanos > 0)
+            scheduler.once(processMsg, delayNanos, TimeUnit.NANOSECONDS);
+        else
+            scheduler.now(processMsg);
     }
 
     public Scheduler scheduler()
diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java b/accord-core/src/main/java/accord/local/PreLoadContext.java
index 0ef8f55..cacf81a 100644
--- a/accord-core/src/main/java/accord/local/PreLoadContext.java
+++ b/accord-core/src/main/java/accord/local/PreLoadContext.java
@@ -185,6 +185,8 @@
 
     static PreLoadContext empty()
     {
-        return contextFor(null, Collections.emptyList(), Keys.EMPTY);
+        return EMPTY_PRELOADCONTEXT;
     }
+
+    PreLoadContext EMPTY_PRELOADCONTEXT = contextFor(null, Collections.emptyList(), Keys.EMPTY);
 }
diff --git a/accord-core/src/main/java/accord/local/SafeCommand.java b/accord-core/src/main/java/accord/local/SafeCommand.java
index 38de775..a02e294 100644
--- a/accord-core/src/main/java/accord/local/SafeCommand.java
+++ b/accord-core/src/main/java/accord/local/SafeCommand.java
@@ -18,9 +18,8 @@
 
 package accord.local;
 
-import java.util.Collection;
-
 import accord.api.Result;
+import accord.local.Command.TransientListener;
 import accord.local.Command.Truncated;
 import accord.primitives.Ballot;
 import accord.primitives.Timestamp;
@@ -42,7 +41,7 @@
     public abstract boolean invalidated();
     public abstract void addListener(Command.TransientListener listener);
     public abstract boolean removeListener(Command.TransientListener listener);
-    public abstract Collection<Command.TransientListener> transientListeners();
+    public abstract Listeners<Command.TransientListener> transientListeners();
 
     public boolean isEmpty()
     {
@@ -69,6 +68,12 @@
         return update(Command.addListener(current(), listener));
     }
 
+    public void addAndInvokeListener(SafeCommandStore safeStore, TransientListener listener)
+    {
+        addListener(listener);
+        listener.onChange(safeStore, this);
+    }
+
     public Command removeListener(Command.DurableAndIdempotentListener listener)
     {
         Command current = current();
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 559f859..eb9574e 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -18,7 +18,7 @@
 
 package accord.local;
 
-import java.util.Collection;
+import java.util.Iterator;
 import java.util.function.Predicate;
 
 import accord.api.Agent;
@@ -36,7 +36,6 @@
 import accord.primitives.Txn.Kind;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
-
 import javax.annotation.Nullable;
 
 import static accord.local.Commands.Cleanup.NO;
@@ -55,7 +54,7 @@
 {
     public interface CommandFunction<I, O>
     {
-        O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, I in);
+        O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, Status status, I in);
     }
 
     public enum TestTimestamp
@@ -213,10 +212,15 @@
      * Visits keys first and then ranges, both in ascending order.
      * Within each key or range visits TxnId in ascending order of queried timestamp.
      */
-    public abstract <T> T mapReduce(Seekables<?, ?> keys, Ranges slice,
+    public abstract <T> T mapReduceWithTerminate(Seekables<?, ?> keysOrRanges, Ranges slice,
+                       TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+                       TestDep testDep, @Nullable TxnId depId,
+                       @Nullable Status minStatus, @Nullable Status maxStatus,
+                       CommandFunction<T, T> map, T accumulate, Predicate<T> terminate);
+    public abstract <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice,
                     TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
                     TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus,
-                    CommandFunction<T, T> map, T initialValue, T terminalValue);
+                    CommandFunction<T, T> map, T accumulate, T terminalValue);
 
     protected abstract void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command);
     protected abstract void register(Seekable keyOrRange, Ranges slice, Command command);
@@ -254,13 +258,21 @@
         notifyListeners(safeCommand, command, command.durableListeners(), safeCommand.transientListeners());
     }
 
-    public void notifyListeners(SafeCommand safeCommand, Command command, Listeners<Command.DurableAndIdempotentListener> durableListeners, Collection<Command.TransientListener> transientListeners)
+    public void notifyListeners(SafeCommand safeCommand, Command command, Listeners<Command.DurableAndIdempotentListener> durableListeners, Listeners<Command.TransientListener> transientListeners)
     {
-        for (Command.DurableAndIdempotentListener listener : durableListeners)
+        Iterator<Command.DurableAndIdempotentListener> durableIterator = durableListeners.reverseIterator();
+        while (durableIterator.hasNext())
+        {
+            Command.DurableAndIdempotentListener listener = durableIterator.next();
             notifyListener(this, safeCommand, command, listener);
+        }
 
-        for (Command.TransientListener listener : transientListeners)
+        Iterator<Command.TransientListener> transientIterator = transientListeners.reverseIterator();
+        while (transientIterator.hasNext())
+        {
+            Command.TransientListener listener = transientIterator.next();
             notifyListener(this, safeCommand, command, listener);
+        }
     }
 
     public static void notifyListener(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, Command.TransientListener listener)
diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java
index 2e1d201..3947bff 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -21,6 +21,7 @@
 import javax.annotation.Nullable;
 
 import accord.api.Result;
+import accord.api.RoutingKey;
 import accord.local.Commands;
 import accord.local.Node;
 import accord.local.Node.Id;
@@ -130,9 +131,15 @@
         node.mapReduceConsumeLocal(this, txnId.epoch(), executeAt.epoch(), this);
     }
 
+
     @Override
     public ApplyReply apply(SafeCommandStore safeStore)
     {
+        return apply(safeStore, txn, txnId, executeAt, deps, scope, writes, result, progressKey);
+    }
+
+    public static ApplyReply apply(SafeCommandStore safeStore, PartialTxn txn, TxnId txnId, Timestamp executeAt, PartialDeps deps, PartialRoute<?> scope, Writes writes, Result result, RoutingKey progressKey)
+    {
         SafeCommand safeCommand = safeStore.get(txnId, executeAt, scope);
         switch (Commands.apply(safeStore, safeCommand, txnId, scope, progressKey, executeAt, deps, txn, writes, result))
         {
diff --git a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
new file mode 100644
index 0000000..79a5b35
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.messages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.local.CommandStore;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
+import accord.messages.Apply.ApplyReply;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+
+/*
+ * Used by local and global inclusive sync points to effect the sync point at each node
+ * Combines commit, execute (with nothing really to execute), and apply into one request/response
+ */
+public class ApplyThenWaitUntilApplied extends WaitUntilApplied
+{
+    private static final Logger logger = LoggerFactory.getLogger(ReadData.class);
+
+    @SuppressWarnings("unused")
+    public static class SerializerSupport
+    {
+        public static ApplyThenWaitUntilApplied create(TxnId txnId, PartialRoute<?> route, PartialDeps deps, Seekables<?, ?> partialTxnKeys, Writes writes, Result result, boolean notifyAgent)
+        {
+            return new ApplyThenWaitUntilApplied(txnId, route, deps, partialTxnKeys, writes, result, notifyAgent);
+        }
+    }
+
+    public final PartialRoute<?> route;
+    public final PartialDeps deps;
+    public final Writes writes;
+    public final Result txnResult;
+    public final boolean notifyAgent;
+    public final Seekables<?, ?> partialTxnKeys;
+
+    ApplyThenWaitUntilApplied(TxnId txnId, PartialRoute<?> route, PartialDeps deps, Seekables<?, ?> partialTxnKeys, Writes writes, Result txnResult, boolean notifyAgent)
+    {
+        super(txnId, partialTxnKeys.toParticipants(), txnId, txnId.epoch());
+        this.route = route;
+        this.deps = deps;
+        this.writes = writes;
+        this.txnResult = txnResult;
+        this.notifyAgent = notifyAgent;
+        this.partialTxnKeys = partialTxnKeys;
+    }
+
+    @Override
+    public ReadType kind()
+    {
+        return ReadType.applyThenWaitUntilApplied;
+    }
+
+    @Override
+    public ReadNack apply(SafeCommandStore safeStore)
+    {
+        RoutingKey progressKey = TxnRequest.progressKey(node, txnId.epoch(), txnId, route);
+        ApplyReply applyReply = Apply.apply(safeStore, null, txnId, txnId, deps, route, writes, txnResult, progressKey);
+        switch (applyReply)
+        {
+            default:
+                throw new IllegalStateException("Unexpected ApplyReply");
+            case Insufficient:
+                throw new IllegalStateException("ApplyThenWaitUntilApplied is always sent with a maximal `Commit` so how can `Apply` have an `Insufficient` result");
+            case Redundant:
+            case Applied:
+                // In both cases it's fine to continue to process and return a response saying
+                // things were applied
+                break;
+        }
+        return super.apply(safeStore);
+    }
+
+    @Override
+    protected void readComplete(CommandStore commandStore, Data readResult, Ranges unavailable)
+    {
+        logger.trace("{}: readComplete ApplyThenWaitUntilApplied", txnId);
+        commandStore.execute(PreLoadContext.contextFor(txnId), safeStore -> {
+            super.readComplete(commandStore, readResult, unavailable);
+        }).begin(node.agent());
+    }
+
+    @Override
+    protected void onAllReadsComplete()
+    {
+        if (notifyAgent)
+            node.agent().onLocalBarrier(partialTxnKeys, txnId);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "WaitForDependenciesThenApply{" +
+                "txnId:" + txnId +
+                '}';
+    }
+}
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index 1efd652..e23a764 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -341,7 +341,7 @@
         {
             // any transaction that started
             commandStore.mapReduce(keys, ranges, shouldHaveWitnessed(startedBefore.rw()), STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,
-                    (keyOrRange, txnId, executeAt, prev) -> {
+                    (keyOrRange, txnId, executeAt, saveStatus, prev) -> {
                         if (executeAt.compareTo(startedBefore) > 0)
                             builder.add(keyOrRange, txnId);
                         return builder;
@@ -355,7 +355,7 @@
         try (Deps.Builder builder = Deps.builder())
         {
             commandStore.mapReduce(keys, ranges, shouldHaveWitnessed(startedBefore.rw()), STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null,
-                    (keyOrRange, txnId, executeAt, prev) -> builder.add(keyOrRange, txnId), (Deps.AbstractBuilder<Deps>)builder, null);
+                    (keyOrRange, txnId, executeAt, saveStatus, prev) -> builder.add(keyOrRange, txnId), builder, (Deps.AbstractBuilder<Deps>)null);
             return builder.build();
         }
     }
@@ -370,7 +370,7 @@
          * has not witnessed us we can safely invalidate (us).
          */
         return commandStore.mapReduce(keys, ranges, shouldHaveWitnessed(startedAfter.rw()), STARTED_AFTER, startedAfter, WITHOUT, startedAfter, Accepted, PreCommitted,
-                (keyOrRange, txnId, executeAt, prev) -> true, false, true);
+                (keyOrRange, txnId, executeAt, saveStatus, prev) -> true, false, true);
     }
 
     private static boolean hasCommittedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
@@ -383,6 +383,6 @@
          * has not witnessed us we can safely invalidate it.
          */
         return commandStore.mapReduce(keys, ranges, shouldHaveWitnessed(startedAfter.rw()), EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null,
-                (keyOrRange, txnId, executeAt, prev) -> true,false, true);
+                (keyOrRange, txnId, executeAt, saveStatus, prev) -> true,false, true);
     }
 }
diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java
index 97e67cc..2e30e05 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -18,7 +18,6 @@
 package accord.messages;
 
 import java.util.Set;
-
 import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
@@ -49,6 +48,9 @@
 import accord.topology.Topologies;
 import accord.topology.Topology;
 import accord.utils.Invariants;
+import accord.utils.TriFunction;
+
+import static accord.utils.Invariants.checkArgument;
 
 public class Commit extends TxnRequest<ReadNack>
 {
@@ -56,9 +58,9 @@
 
     public static class SerializerSupport
     {
-        public static Commit create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Kind kind, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadTxnData read)
+        public static Commit create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Kind kind, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
         {
-            return new Commit(kind, txnId, scope, waitForEpoch, executeAt, partialTxn, partialDeps, fullRoute, read);
+            return new Commit(kind, txnId, scope, waitForEpoch, executeAt, partialTxn, partialDeps, fullRoute, readData);
         }
     }
 
@@ -67,14 +69,25 @@
     public final @Nullable PartialTxn partialTxn;
     public final PartialDeps partialDeps;
     public final @Nullable FullRoute<?> route;
-    public final ReadTxnData read;
+    public final ReadData readData;
 
     public enum Kind { Minimal, Maximal }
 
     // TODO (low priority, clarity): cleanup passing of topologies here - maybe fetch them afresh from Node?
     //                               Or perhaps introduce well-named classes to represent different topology combinations
+
+    public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, ReadTxnData read)
+    {
+        this(kind, to, coordinateTopology, topologies, txnId, txn, route, executeAt, deps, (u1, u2, u3) -> read);
+    }
+
     public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, @Nullable Participants<?> readScope, Timestamp executeAt, Deps deps, boolean read)
     {
+        this(kind, to, coordinateTopology, topologies, txnId, txn, route, executeAt, deps, read ? new ReadTxnData(to, topologies, txnId, readScope, executeAt) : null);
+    }
+
+    public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, TriFunction<Txn, PartialRoute<?>, PartialDeps, ReadData> toExecuteFactory)
+    {
         super(to, topologies, route, txnId);
 
         FullRoute<?> sendRoute = null;
@@ -100,10 +113,10 @@
         this.partialTxn = partialTxn;
         this.partialDeps = deps.slice(scope.covering());
         this.route = sendRoute;
-        this.read = read ? new ReadTxnData(to, topologies, txnId, readScope, executeAt) : null;
+        this.readData = toExecuteFactory.apply(partialTxn != null ? partialTxn : txn, scope, partialDeps);
     }
 
-    Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadTxnData read)
+    Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
     {
         super(txnId, scope, waitForEpoch);
         this.kind = kind;
@@ -111,7 +124,7 @@
         this.partialTxn = partialTxn;
         this.partialDeps = partialDeps;
         this.route = fullRoute;
-        this.read = read;
+        this.readData = readData;
     }
 
     // TODO (low priority, clarity): accept Topology not Topologies
@@ -141,6 +154,25 @@
         }
     }
 
+    public static void commitMaximalAndBlockOnDeps(Node node, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, Callback<ReadReply> callback)
+    {
+        checkArgument(topologies.size() == 1);
+        Topology topology = topologies.get(0);
+        for (Node.Id to : topology.nodes())
+        {
+            // To simplify making sure the agent is notified once and is notified before the barrier coordination
+            // returns a result; we never notify the agent on the coordinator as part of WaitForDependenciesThenApply execution
+            // Also always send a maximal commit since we don't block on deps often and that saves having to have an Insufficient code path
+            // going back for the Apply in `ApplyThenWaitUntilApplied
+            boolean notifyAgent = !to.equals(node.id());
+            Commit commit = new Commit(
+                    Kind.Maximal, to, topology, topologies, txnId,
+                    txn, route, txnId, deps,
+                    (maybePartialTransaction, partialRoute, partialDeps) -> new ApplyThenWaitUntilApplied(txnId, partialRoute, partialDeps, maybePartialTransaction.keys().slice(partialDeps.covering), txn.execute(txnId, txnId, null), txn.result(txnId, txnId, null), notifyAgent));
+            node.send(to, commit, callback);
+        }
+    }
+
     @Override
     public TxnId primaryTxnId()
     {
@@ -188,8 +220,8 @@
     {
         if (reply != null || failure != null)
             node.reply(replyTo, replyContext, reply, failure);
-        else if (read != null)
-            read.process(node, replyTo, replyContext);
+        else if (readData != null)
+            readData.process(node, replyTo, replyContext);
     }
 
     @Override
@@ -210,7 +242,7 @@
                ", txnId: " + txnId +
                ", executeAt: " + executeAt +
                ", deps: " + partialDeps +
-               ", read: " + read +
+               ", toExecute: " + readData +
                '}';
     }
 
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java
index aab5e0c..42f1ad4 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -17,54 +17,55 @@
  */
 package accord.messages;
 
-import static accord.messages.MessageType.Kind.REMOTE;
 import static accord.messages.MessageType.Kind.LOCAL;
+import static accord.messages.MessageType.Kind.REMOTE;
 
 /**
  * Meant to assist implementations with mapping accord messages to their own messaging systems.
  */
 public enum MessageType
 {
-    SIMPLE_RSP               (REMOTE, false),
-    FAILURE_RSP              (REMOTE, false),
-    PRE_ACCEPT_REQ           (REMOTE, true ),
-    PRE_ACCEPT_RSP           (REMOTE, false),
-    ACCEPT_REQ               (REMOTE, true ),
-    ACCEPT_RSP               (REMOTE, false),
-    ACCEPT_INVALIDATE_REQ    (REMOTE, true ),
-    GET_DEPS_REQ             (REMOTE, false),
-    GET_DEPS_RSP             (REMOTE, false),
-    COMMIT_MINIMAL_REQ       (REMOTE, true ),
-    COMMIT_MAXIMAL_REQ       (REMOTE, true ),
-    COMMIT_INVALIDATE_REQ    (REMOTE, true ),
-    APPLY_MINIMAL_REQ        (REMOTE, true ),
-    APPLY_MAXIMAL_REQ        (REMOTE, true ),
-    APPLY_RSP                (REMOTE, false),
-    READ_REQ                 (REMOTE, false),
-    READ_RSP                 (REMOTE, false),
-    BEGIN_RECOVER_REQ        (REMOTE, true ),
-    BEGIN_RECOVER_RSP        (REMOTE, false),
-    BEGIN_INVALIDATE_REQ     (REMOTE, true ),
-    BEGIN_INVALIDATE_RSP     (REMOTE, false),
-    WAIT_ON_COMMIT_REQ       (REMOTE, false),
-    WAIT_ON_COMMIT_RSP       (REMOTE, false),
-    WAIT_ON_APPLY_REQ        (REMOTE, false),
-    INFORM_OF_TXN_REQ        (REMOTE, true ),
-    INFORM_DURABLE_REQ       (REMOTE, true ),
-    INFORM_HOME_DURABLE_REQ  (REMOTE, true ),
-    CHECK_STATUS_REQ         (REMOTE, false),
-    CHECK_STATUS_RSP         (REMOTE, false),
-    FETCH_DATA_REQ           (REMOTE, false),
-    FETCH_DATA_RSP           (REMOTE, false),
-    SET_SHARD_DURABLE_REQ    (REMOTE, true ),
-    SET_GLOBALLY_DURABLE_REQ (REMOTE, true ),
-    QUERY_DURABLE_BEFORE_REQ (REMOTE, false),
-    QUERY_DURABLE_BEFORE_RSP (REMOTE, false),
+    SIMPLE_RSP                         (REMOTE, false),
+    FAILURE_RSP                        (REMOTE, false),
+    PRE_ACCEPT_REQ                     (REMOTE, true ),
+    PRE_ACCEPT_RSP                     (REMOTE, false),
+    ACCEPT_REQ                         (REMOTE, true ),
+    ACCEPT_RSP                         (REMOTE, false),
+    ACCEPT_INVALIDATE_REQ              (REMOTE, true ),
+    GET_DEPS_REQ                       (REMOTE, false),
+    GET_DEPS_RSP                       (REMOTE, false),
+    COMMIT_MINIMAL_REQ                 (REMOTE, true ),
+    COMMIT_MAXIMAL_REQ                 (REMOTE, true ),
+    COMMIT_INVALIDATE_REQ              (REMOTE, true ),
+    APPLY_MINIMAL_REQ                  (REMOTE, true ),
+    APPLY_MAXIMAL_REQ                  (REMOTE, true ),
+    APPLY_RSP                          (REMOTE, false),
+    READ_REQ                           (REMOTE, false),
+    READ_RSP                           (REMOTE, false),
+    BEGIN_RECOVER_REQ                  (REMOTE, true ),
+    BEGIN_RECOVER_RSP                  (REMOTE, false),
+    BEGIN_INVALIDATE_REQ               (REMOTE, true ),
+    BEGIN_INVALIDATE_RSP               (REMOTE, false),
+    WAIT_ON_COMMIT_REQ                 (REMOTE, false),
+    WAIT_ON_COMMIT_RSP                 (REMOTE, false),
+    WAIT_UNTIL_APPLIED_REQ             (REMOTE, false),
+    INFORM_OF_TXN_REQ                  (REMOTE, true ),
+    INFORM_DURABLE_REQ                 (REMOTE, true ),
+    INFORM_HOME_DURABLE_REQ            (REMOTE, true ),
+    CHECK_STATUS_REQ                   (REMOTE, false),
+    CHECK_STATUS_RSP                   (REMOTE, false),
+    FETCH_DATA_REQ                     (REMOTE, false),
+    FETCH_DATA_RSP                     (REMOTE, false),
+    SET_SHARD_DURABLE_REQ              (REMOTE, true ),
+    SET_GLOBALLY_DURABLE_REQ           (REMOTE, true ),
+    QUERY_DURABLE_BEFORE_REQ           (REMOTE, false),
+    QUERY_DURABLE_BEFORE_RSP           (REMOTE, false),
+    APPLY_AND_WAIT_UNTIL_APPLIED_REQ   (REMOTE, true ),
 
-    PROPAGATE_PRE_ACCEPT_MSG (LOCAL,  true ),
-    PROPAGATE_COMMIT_MSG     (LOCAL,  true ),
-    PROPAGATE_APPLY_MSG      (LOCAL,  true ),
-    PROPAGATE_OTHER_MSG      (LOCAL,  true ),
+    PROPAGATE_PRE_ACCEPT_MSG           (LOCAL,  true ),
+    PROPAGATE_COMMIT_MSG               (LOCAL,  true ),
+    PROPAGATE_APPLY_MSG                (LOCAL,  true ),
+    PROPAGATE_OTHER_MSG                (LOCAL,  true ),
     ;
 
     public enum Kind { LOCAL, REMOTE }
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
index 1be51ca..ce72ceb 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -22,6 +22,9 @@
 import java.util.Objects;
 import javax.annotation.Nullable;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.local.Command;
 import accord.local.Commands;
 import accord.local.Node.Id;
@@ -49,6 +52,9 @@
 
 public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply> implements EpochSupplier
 {
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(PreAccept.class);
+
     public static class SerializerSupport
     {
         public static PreAccept create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, long maxEpoch, PartialTxn partialTxn, @Nullable FullRoute<?> fullRoute)
@@ -257,7 +263,7 @@
         //       This is necessary for reporting to a bootstrapping replica which TxnId it must not prune from dependencies
         //       i.e. the source replica reports to the target replica those TxnId that STARTED_BEFORE and EXECUTES_AFTER.
         commandStore.mapReduce(keys, ranges, testKind, STARTED_BEFORE, executeAt, ANY_DEPS, null, null, null,
-                (keyOrRange, testTxnId, testExecuteAt, in) -> {
+                (keyOrRange, testTxnId, testExecuteAt, saveStatus, in) -> {
                     // TODO (easy, efficiency): either pass txnId as parameter or encode this behaviour in a specialised builder to avoid extra allocations
                     if (!testTxnId.equals(txnId))
                         in.add(keyOrRange, testTxnId);
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 66b5234..3eacce6 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -28,6 +28,7 @@
 import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.SafeCommandStore;
+import accord.messages.ReadData.ReadNack;
 import accord.primitives.PartialTxn;
 import accord.primitives.Participants;
 import accord.primitives.Ranges;
@@ -41,10 +42,40 @@
 import static accord.messages.TxnRequest.latestRelevantEpochIndex;
 
 // TODO (required, efficiency): dedup - can currently have infinite pending reads that will be executed independently
-public abstract class ReadData extends AbstractEpochRequest<ReadData.ReadNack>
+public abstract class ReadData extends AbstractEpochRequest<ReadNack>
 {
     private static final Logger logger = LoggerFactory.getLogger(ReadData.class);
 
+    public enum ReadType
+    {
+        readTxnData(0),
+        waitUntilApplied(1),
+        applyThenWaitUntilApplied(2);
+
+        public final byte val;
+
+        ReadType(int val)
+        {
+            this.val = (byte)val;
+        }
+
+        @SuppressWarnings("unused")
+        public static ReadType fromValue(byte val)
+        {
+            switch (val)
+            {
+                case 0:
+                    return readTxnData;
+                case 1:
+                    return waitUntilApplied;
+                case 2:
+                    return applyThenWaitUntilApplied;
+                default:
+                    throw new IllegalArgumentException("Unrecognized ReadType value " + val);
+            }
+        }
+    }
+
     // TODO (expected, cleanup): should this be a Route?
     public final Participants<?> readScope;
     private final long waitForEpoch;
@@ -68,10 +99,14 @@
         this.waitForEpoch = waitForEpoch;
     }
 
+    abstract public ReadType kind();
+
     protected abstract void cancel();
     protected abstract long executeAtEpoch();
     protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail);
 
+    protected void onAllReadsComplete() {}
+
     @Override
     public long waitForEpoch()
     {
@@ -128,7 +163,10 @@
         // and prevents races where we respond before dispatching all the required reads (if the reads are
         // completing faster than the reads can be setup on all required shards)
         if (-1 == --waitingOnCount)
+        {
+            onAllReadsComplete();
             reply(this.unavailable, data, null);
+        }
     }
 
     protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable)
@@ -236,4 +274,4 @@
             return true;
         }
     }
-}
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java b/accord-core/src/main/java/accord/messages/ReadTxnData.java
index 4e31910..5dec9ba 100644
--- a/accord-core/src/main/java/accord/messages/ReadTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadTxnData.java
@@ -18,8 +18,6 @@
 
 package accord.messages;
 
-import javax.annotation.Nullable;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +35,7 @@
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
+import javax.annotation.Nullable;
 
 import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
 import static accord.local.Status.Committed;
@@ -99,6 +98,12 @@
     }
 
     @Override
+    public ReadType kind()
+    {
+        return ReadType.readTxnData;
+    }
+
+    @Override
     protected long executeAtEpoch()
     {
         return executeAtEpoch;
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index 5a37fa9..f383e60 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -20,6 +20,7 @@
 
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
+import accord.primitives.Ranges;
 import accord.primitives.SyncPoint;
 import accord.utils.MapReduceConsume;
 
@@ -39,13 +40,13 @@
     @Override
     public void process()
     {
-        node.mapReduceConsumeLocal(this, exclusiveSyncPoint.ranges, waitForEpoch(), waitForEpoch(), this);
+        node.mapReduceConsumeLocal(this, exclusiveSyncPoint.keysOrRanges, waitForEpoch(), waitForEpoch(), this);
     }
 
     @Override
     public SimpleReply apply(SafeCommandStore safeStore)
     {
-        safeStore.commandStore().markShardDurable(safeStore, exclusiveSyncPoint.syncId, exclusiveSyncPoint.ranges);
+        safeStore.commandStore().markShardDurable(safeStore, exclusiveSyncPoint.syncId, (Ranges)exclusiveSyncPoint.keysOrRanges);
         return Ok;
     }
 
diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java b/accord-core/src/main/java/accord/messages/TxnRequest.java
index c324cf6..0025329 100644
--- a/accord-core/src/main/java/accord/messages/TxnRequest.java
+++ b/accord-core/src/main/java/accord/messages/TxnRequest.java
@@ -20,17 +20,22 @@
 
 import java.util.function.BiFunction;
 
-import accord.local.SafeCommandStore;
-import accord.utils.MapReduceConsume;
-import accord.primitives.*;
-import accord.utils.Invariants;
-
 import accord.api.RoutingKey;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Routables;
+import accord.primitives.Route;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import accord.topology.Topologies;
 import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.MapReduceConsume;
 
 import static java.lang.Long.min;
 
@@ -76,11 +81,11 @@
         }
 
         @Override
-        RoutingKey progressKey(Node node)
+        RoutingKey progressKey()
         {
             if (doNotComputeProgressKey)
                 return null;
-            return super.progressKey(node);
+            return super.progressKey();
         }
     }
 
@@ -136,14 +141,19 @@
         this.node = on;
         this.replyTo = replyTo;
         this.replyContext = replyContext;
-        this.progressKey = progressKey(node); // TODO (low priority, clarity): not every class that extends TxnRequest needs this set
+        this.progressKey = progressKey(); // TODO (low priority, clarity): not every class that extends TxnRequest needs this set
         process();
     }
 
-    RoutingKey progressKey(Node node)
+    RoutingKey progressKey()
+    {
+        return progressKey(node, waitForEpoch, txnId, scope);
+    }
+
+    public static RoutingKey progressKey(Node node, long waitForEpoch, TxnId txnId, PartialRoute<?> scope)
     {
         // if waitForEpoch < txnId.epoch, then this replica's ownership is unchanged
-        long progressEpoch = min(waitForEpoch(), txnId.epoch());
+        long progressEpoch = min(waitForEpoch, txnId.epoch());
         return node.trySelectProgressKey(progressEpoch, scope, scope.homeKey());
     }
 
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index 682c33b..730055c 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -238,6 +238,12 @@
     }
 
     @Override
+    public ReadType kind()
+    {
+        return ReadType.waitUntilApplied;
+    }
+
+    @Override
     protected void cancel()
     {
         node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> removeListener(in, txnId), node.agent()));
@@ -246,7 +252,7 @@
     @Override
     public MessageType type()
     {
-        return MessageType.WAIT_ON_APPLY_REQ;
+        return MessageType.WAIT_UNTIL_APPLIED_REQ;
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/primitives/Seekables.java b/accord-core/src/main/java/accord/primitives/Seekables.java
index 80929d7..07b2f22 100644
--- a/accord-core/src/main/java/accord/primitives/Seekables.java
+++ b/accord-core/src/main/java/accord/primitives/Seekables.java
@@ -19,6 +19,7 @@
 package accord.primitives;
 
 import accord.api.RoutingKey;
+import accord.primitives.Routable.Domain;
 
 import static accord.primitives.Routables.Slice.Overlapping;
 
@@ -37,4 +38,9 @@
     Participants<?> toParticipants();
 
     FullRoute<?> toRoute(RoutingKey homeKey);
+    
+    static Seekables<?, ?> of(Seekable seekable)
+    {
+        return seekable.domain() == Domain.Range ? Ranges.of(seekable.asRange()) : Keys.of(seekable.asKey());
+    }
 }
diff --git a/accord-core/src/main/java/accord/primitives/SyncPoint.java b/accord-core/src/main/java/accord/primitives/SyncPoint.java
index da2b823..fedc64c 100644
--- a/accord-core/src/main/java/accord/primitives/SyncPoint.java
+++ b/accord-core/src/main/java/accord/primitives/SyncPoint.java
@@ -29,41 +29,44 @@
  * so while these are processed much like a transaction, they are invisible to real transactions which
  * may proceed before this is witnessed by the node processing it.
  */
-public class SyncPoint
+public class SyncPoint<S extends Seekables<?, ?>>
 {
     public static class SerializationSupport
     {
-        public static SyncPoint construct(TxnId syncId, Deps waitFor, Ranges ranges, RoutingKey homeKey)
+        public static SyncPoint construct(TxnId syncId, Deps waitFor, Seekables<?,?> keysOrRanges, RoutingKey homeKey, boolean finishedAsync)
         {
-            return new SyncPoint(syncId, waitFor, ranges, homeKey);
+            return new SyncPoint(syncId, waitFor, keysOrRanges, homeKey, finishedAsync);
         }
     }
 
     public final TxnId syncId;
     public final Deps waitFor;
-    public final Ranges ranges;
+    public final S keysOrRanges;
     public final RoutingKey homeKey;
+    public final boolean finishedAsync;
 
-    public SyncPoint(TxnId syncId, Deps waitFor, Ranges ranges, FullRangeRoute route)
+    public SyncPoint(TxnId syncId, Deps waitFor, S keysOrRanges, FullRoute route, boolean finishedAsync)
     {
-        Invariants.checkArgument(ranges.toRoute(route.homeKey).equals(route), "Expected homeKey %s from route %s to be in ranges %s", route.homeKey, route, ranges);
+        Invariants.checkArgument(keysOrRanges.toRoute(route.homeKey()).equals(route), "Expected homeKey %s from route %s to be in ranges %s", route.homeKey(), route, keysOrRanges);
         this.syncId = syncId;
         this.waitFor = waitFor;
-        this.ranges = ranges;
+        this.keysOrRanges = keysOrRanges;
         this.homeKey = route.homeKey();
+        this.finishedAsync = finishedAsync;
     }
 
-    private SyncPoint(TxnId syncId, Deps waitFor, Ranges ranges, RoutingKey homeKey)
+    private SyncPoint(TxnId syncId, Deps waitFor, S keysOrRanges, RoutingKey homeKey, boolean finishedAsync)
     {
         this.syncId = syncId;
         this.waitFor = waitFor;
-        this.ranges = ranges;
+        this.keysOrRanges = keysOrRanges;
         this.homeKey = homeKey;
+        this.finishedAsync = finishedAsync;
     }
 
-    public FullRangeRoute route()
+    public FullRoute route()
     {
-        return ranges.toRoute(homeKey);
+        return keysOrRanges.toRoute(homeKey);
     }
 
     // TODO (required): document this and its usages; make sure call-sites make sense
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 4f07a29..abf22e8 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -20,11 +20,10 @@
 
 import accord.local.Node.Id;
 import accord.utils.Invariants;
+import javax.annotation.Nonnull;
 
 import static accord.utils.Invariants.checkArgument;
 
-import javax.annotation.Nonnull;
-
 public class Timestamp implements Comparable<Timestamp>, EpochSupplier
 {
     public static final Timestamp MAX = new Timestamp(Long.MAX_VALUE, Long.MAX_VALUE, Id.MAX);
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java
index c3384a2..9349086 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -253,7 +253,7 @@
 
     default Result result(TxnId txnId, Timestamp executeAt, @Nullable Data data)
     {
-        return query().compute(txnId, executeAt, data, read(), update());
+        return query().compute(txnId, executeAt, keys(), data, read(), update());
     }
 
     default Writes execute(TxnId txnId, Timestamp executeAt, @Nullable Data data)
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index aa3b5c6..27b43ef 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -35,8 +35,9 @@
 
 public class Topology
 {
-    public static final Topology EMPTY = new Topology(0, new Shard[0], Ranges.EMPTY, Collections.emptyMap(), Ranges.EMPTY, new int[0]);
+    public static final long EMPTY_EPOCH = 0;
     private static final int[] EMPTY_SUBSET = new int[0];
+    public static final Topology EMPTY = new Topology(EMPTY_EPOCH, new Shard[0], Ranges.EMPTY, Collections.emptyMap(), Ranges.EMPTY, EMPTY_SUBSET);
     final long epoch;
     final Shard[] shards;
     final Ranges ranges;
diff --git a/accord-core/src/main/java/accord/utils/DeterministicSet.java b/accord-core/src/main/java/accord/utils/DeterministicSet.java
index 7087e9e..ec0942f 100644
--- a/accord-core/src/main/java/accord/utils/DeterministicSet.java
+++ b/accord-core/src/main/java/accord/utils/DeterministicSet.java
@@ -104,6 +104,29 @@
         };
     }
 
+    public Iterator<T> reverseIterator()
+    {
+        return new Iterator<T>()
+        {
+            Entry<T> previous = head.prev;
+            @Override
+            public boolean hasNext()
+            {
+                return previous != head;
+            }
+
+            @Override
+            public T next()
+            {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+                T result = previous.item;
+                previous = previous.prev;
+                return result;
+            }
+        };
+    }
+
     @Override
     public boolean equals(Object o)
     {
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
index 0c54622..9f878c1 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
@@ -18,17 +18,22 @@
 
 package accord.utils.async;
 
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
 
+import static accord.utils.Invariants.checkArgument;
+
 public class AsyncCallbacks
 {
-    public static <T> BiConsumer<? super T, Throwable> inExecutor(BiConsumer<? super T, Throwable> callback, Executor executor)
+    public static <T> BiConsumer<? super T, Throwable> inExecutorService(BiConsumer<? super T, Throwable> callback, ExecutorService es)
     {
+        // Checking for shutdown once here for the other `inExecutorService` as well as `AsyncChain.addCallback`
+        // So we don't repeat the check
+        checkArgument(!es.isShutdown(), "ExecutorService is shutdown");
         return (result, throwable) -> {
             try
             {
-                executor.execute(() -> callback.accept(result, throwable));
+                es.execute(() -> callback.accept(result, throwable));
             }
             catch (Throwable t)
             {
@@ -38,9 +43,9 @@
     }
 
 
-    public static <T> BiConsumer<? super T, Throwable> inExecutor(Runnable runnable, Executor executor)
+    public static <T> BiConsumer<? super T, Throwable> inExecutorService(Runnable runnable, ExecutorService es)
     {
-        return inExecutor(toCallback(runnable), executor);
+        return inExecutorService(toCallback(runnable), es);
     }
 
     public static <T> BiConsumer<T, Throwable> toCallback(Runnable runnable) {
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
index 130731d..43cc6db 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
@@ -19,6 +19,7 @@
 package accord.utils.async;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -83,14 +84,14 @@
         return addCallback(AsyncCallbacks.toCallback(runnable));
     }
 
-    default AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback, Executor executor)
+    default AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback, ExecutorService es)
     {
-        return addCallback(AsyncCallbacks.inExecutor(callback, executor));
+        return addCallback(AsyncCallbacks.inExecutorService(callback, es));
     }
 
-    default AsyncChain<V> addCallback(Runnable runnable, Executor executor)
+    default AsyncChain<V> addCallback(Runnable runnable, ExecutorService es)
     {
-        return addCallback(AsyncCallbacks.inExecutor(runnable, executor));
+        return addCallback(AsyncCallbacks.inExecutorService(runnable, es));
     }
 
     /**
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index ae5a1ce..08ea070 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -593,30 +593,14 @@
 
     public static <V> V getBlocking(AsyncChain<V> chain) throws InterruptedException, ExecutionException
     {
-        class Result
+        try
         {
-            final V result;
-            final Throwable failure;
-
-            public Result(V result, Throwable failure)
-            {
-                this.result = result;
-                this.failure = failure;
-            }
+            return getBlocking(chain, 0, TimeUnit.DAYS);
         }
-
-        AtomicReference<Result> callbackResult = new AtomicReference<>();
-        CountDownLatch latch = new CountDownLatch(1);
-
-        chain.begin((result, failure) -> {
-            callbackResult.set(new Result(result, failure));
-            latch.countDown();
-        });
-
-        latch.await();
-        Result result = callbackResult.get();
-        if (result.failure == null) return result.result;
-        else throw new ExecutionException(result.failure);
+        catch (TimeoutException e)
+        {
+            throw new IllegalStateException("Should not throw timeout exception e");
+        }
     }
 
     public static <V> V getBlockingAndRethrow(AsyncChain<V> chain)
@@ -677,8 +661,14 @@
             latch.countDown();
         });
 
-        if (!latch.await(timeout, unit))
-            throw new TimeoutException();
+        if (timeout > 0)
+        {
+            if (!latch.await(timeout, unit))
+                throw new TimeoutException();
+        }
+        else
+            latch.await();
+
         Result result = callbackResult.get();
         if (result.failure == null) return result.result;
         else throw new ExecutionException(result.failure);
@@ -686,6 +676,18 @@
 
     public static <V> V getUninterruptibly(AsyncChain<V> chain) throws ExecutionException
     {
+        try
+        {
+            return getUninterruptibly(chain, 0, TimeUnit.DAYS);
+        }
+        catch (TimeoutException e)
+        {
+            throw new IllegalStateException("Should not throw timeout exception e");
+        }
+    }
+
+    public static <V> V getUninterruptibly(AsyncChain<V> chain, long time, TimeUnit unit) throws ExecutionException, TimeoutException
+    {
         boolean interrupted = false;
         try
         {
@@ -693,7 +695,7 @@
             {
                 try
                 {
-                    return getBlocking(chain);
+                    return getBlocking(chain, time, unit);
                 }
                 catch (InterruptedException e)
                 {
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
index 3269f7a..944d517 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
@@ -18,7 +18,6 @@
 
 package accord.utils.async;
 
-import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 
 /**
@@ -36,18 +35,6 @@
         return addCallback(AsyncCallbacks.toCallback(runnable));
     }
 
-    @Override
-    default AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback, Executor executor)
-    {
-        return addCallback(AsyncCallbacks.inExecutor(callback, executor));
-    }
-
-    @Override
-    default AsyncResult<V> addCallback(Runnable runnable, Executor executor)
-    {
-        return addCallback(AsyncCallbacks.inExecutor(runnable, executor));
-    }
-
     boolean isDone();
     boolean isSuccess();
 
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 5773e68..ad7171f 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -46,7 +46,7 @@
         }
     }
 
-    static class AbstractResult<V> implements AsyncResult<V>
+    public static class AbstractResult<V> implements AsyncResult<V>
     {
         private static final AtomicReferenceFieldUpdater<AbstractResult, Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, Object.class, "state");
 
@@ -65,6 +65,17 @@
 
         private void notify(Listener<V> listener, Result<V> result)
         {
+            Listener<V> reversed = null;
+            Listener<V> tmp;
+            while (listener != null)
+            {
+                tmp = listener;
+                listener = listener.next;
+                tmp.next = reversed;
+                reversed = tmp;
+            }
+            listener = reversed;
+
             List<Throwable> failures = null;
             while (listener != null)
             {
@@ -109,6 +120,15 @@
             return trySetResult(new Result<>(result, failure));
         }
 
+        protected boolean trySuccess(V value)
+        {
+            return trySetResult(value, null);
+        }
+        protected boolean tryFailure(Throwable throwable)
+        {
+            return trySetResult(null, throwable);
+        }
+
         private  AsyncChain<V> newChain()
         {
             return new AsyncChains.Head<V>()
@@ -222,13 +242,13 @@
         @Override
         public boolean trySuccess(V value)
         {
-            return trySetResult(value, null);
+            return super.trySuccess(value);
         }
 
         @Override
         public boolean tryFailure(Throwable throwable)
         {
-            return trySetResult(null, throwable);
+            return super.tryFailure(throwable);
         }
     }
 
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 6d42388..3fc3ac9 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -25,6 +25,8 @@
 
 import com.google.common.collect.Sets;
 
+import java.time.Duration;
+
 import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.config.LocalConfig;
@@ -53,6 +55,9 @@
 import accord.utils.Invariants;
 import accord.utils.ThreadPoolScheduler;
 
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+
 import static accord.utils.async.AsyncChains.awaitUninterruptibly;
 
 public class Utils
@@ -161,4 +166,19 @@
         awaitUninterruptibly(node.unsafeStart());
         return node;
     }
+
+    public static void spinUntilSuccess(ThrowingRunnable runnable)
+    {
+        spinUntilSuccess(runnable, 10);
+    }
+
+    public static void spinUntilSuccess(ThrowingRunnable runnable, int timeoutInSeconds)
+    {
+        Awaitility.await()
+                  .pollInterval(Duration.ofMillis(100))
+                  .pollDelay(0, TimeUnit.MILLISECONDS)
+                  .atMost(timeoutInSeconds, TimeUnit.SECONDS)
+                  .ignoreExceptions()
+                  .untilAsserted(runnable);
+    }
 }
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
index 96be9fe..e12cb19 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
@@ -18,48 +18,94 @@
 
 package accord.coordinate;
 
-import accord.local.Node;
-import accord.impl.mock.MockCluster;
-import accord.api.Result;
-import accord.impl.mock.MockStore;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 
-import org.junit.jupiter.api.Assertions;
+import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import accord.api.Agent;
+import accord.api.BarrierType;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.impl.SimpleProgressLog;
+import accord.impl.TestAgent;
+import accord.impl.mock.MockCluster;
+import accord.impl.mock.MockStore;
+import accord.local.Command;
+import accord.local.Command.TransientListener;
+import accord.local.Commands;
+import accord.local.Commands.AcceptOutcome;
+import accord.local.Commands.ApplyOutcome;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.messages.ReadData.ReadOk;
 import accord.primitives.FullKeyRoute;
 import accord.primitives.FullRangeRoute;
 import accord.primitives.Keys;
+import accord.primitives.PartialDeps;
 import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 
 import static accord.Utils.id;
 import static accord.Utils.ids;
 import static accord.Utils.ranges;
+import static accord.Utils.spinUntilSuccess;
 import static accord.Utils.writeTxn;
+import static accord.impl.IntKey.key;
 import static accord.impl.IntKey.keys;
 import static accord.impl.IntKey.range;
+import static accord.local.Status.Applied;
 import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Txn.Kind.Read;
 import static accord.primitives.Txn.Kind.Write;
+import static accord.utils.Invariants.checkState;
 import static accord.utils.async.AsyncChains.getUninterruptibly;
+import static com.google.common.base.Predicates.alwaysTrue;
+import static java.lang.Thread.sleep;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class CoordinateTransactionTest
 {
+    private static final Logger logger = LoggerFactory.getLogger(CoordinateTransactionTest.class);
+
+    @AfterEach
+    public void tearDown()
+    {
+        SimpleProgressLog.PAUSE_FOR_TEST = false;
+    }
+
     @Test
     void simpleTest() throws Throwable
     {
         try (MockCluster cluster = MockCluster.builder().build())
         {
             Node node = cluster.get(1);
-            Assertions.assertNotNull(node);
+            assertNotNull(node);
 
             TxnId txnId = node.nextTxnId(Write, Key);
             Keys keys = keys(10);
             Txn txn = writeTxn(keys);
             FullKeyRoute route = keys.toRoute(keys.get(0).toUnseekable());
             Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn, route));
-            Assertions.assertEquals(MockStore.RESULT, result);
+            assertEquals(MockStore.RESULT, result);
         }
     }
 
@@ -69,14 +115,14 @@
         try (MockCluster cluster = MockCluster.builder().build())
         {
             Node node = cluster.get(1);
-            Assertions.assertNotNull(node);
+            assertNotNull(node);
 
             TxnId txnId = node.nextTxnId(Write, Key);
             Ranges keys = ranges(range(1, 2));
             Txn txn = writeTxn(keys);
             FullRangeRoute route = keys.toRoute(keys.get(0).someIntersectingRoutingKey(null));
             Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn, route));
-            Assertions.assertEquals(MockStore.RESULT, result);
+            assertEquals(MockStore.RESULT, result);
         }
     }
 
@@ -86,7 +132,7 @@
         try (MockCluster cluster = MockCluster.builder().build())
         {
             Node node = cluster.get(1);
-            Assertions.assertNotNull(node);
+            assertNotNull(node);
 
             TxnId oldId1 = node.nextTxnId(Write, Key);
             TxnId oldId2 = node.nextTxnId(Write, Key);
@@ -98,11 +144,11 @@
                 Txn txn = writeTxn(keys);
                 FullKeyRoute route = keys.toRoute(keys.get(0).someIntersectingRoutingKey(null));
                 getUninterruptibly(CoordinateTransaction.coordinate(node, oldId1, txn, route));
-                Assertions.fail();
+                fail();
             }
             catch (ExecutionException e)
             {
-                Assertions.assertEquals(Invalidated.class, e.getCause().getClass());
+                assertEquals(Invalidated.class, e.getCause().getClass());
             }
 
             Keys keys = keys(2);
@@ -113,6 +159,150 @@
     }
 
     @Test
+    void barrierTest() throws Throwable
+    {
+        try (MockCluster cluster = MockCluster.builder().build())
+        {
+            Node node = cluster.get(1);
+            Agent agent = node.agent();
+            assertNotNull(node);
+            long epoch = node.epoch();
+
+            // This is checking for a local barrier so it should succeed even if we drop the completion messages from the other nodes
+            cluster.networkFilter.addFilter(id -> ImmutableSet.of(cluster.get(2).id(), cluster.get(3).id()).contains(id), alwaysTrue(), message -> message instanceof ReadOk);
+            // Should create a sync transaction since no pre-existing one can be used and return as soon as it is locally applied
+            Barrier<Keys> localInitiatingBarrier = Barrier.barrier(node, Keys.of(key(3)), node.epoch(), BarrierType.local);
+            // Sync transaction won't be created until callbacks for existing transaction check runs
+            Semaphore existingTransactionCheckCompleted = new Semaphore(0);
+            localInitiatingBarrier.existingTransactionCheck.addCallback((ignored1, ignored2) -> existingTransactionCheckCompleted.release());
+            assertTrue(existingTransactionCheckCompleted.tryAcquire(5, TimeUnit.SECONDS));
+            // It's possible for the callback to run before the sync point is created in a different callback
+            // because multiple callbacks can run concurrently. `addCallback` might see the finished result
+            // and run immediately in this thread
+            spinUntilSuccess(() -> checkState(localInitiatingBarrier.coordinateSyncPoint != null));
+            // Should be able to find the txnid now and wait for local application
+            TxnId initiatingBarrierSyncTxnId = localInitiatingBarrier.coordinateSyncPoint.txnId;
+            Semaphore barrierAppliedLocally = new Semaphore(0);
+            node.ifLocal(PreLoadContext.contextFor(initiatingBarrierSyncTxnId), key(3).toUnseekable(), epoch, (safeStore) ->
+                safeStore.get(initiatingBarrierSyncTxnId, key(3).toUnseekable()).addAndInvokeListener(
+                    safeStore,
+                    commandListener((safeStore2, command) -> {
+                        if (command.current().is(Applied))
+                            barrierAppliedLocally.release();
+                    }))
+            ).begin(agent);
+            assertTrue(barrierAppliedLocally.tryAcquire(5, TimeUnit.SECONDS));
+            // If the command is locally applied the future for the barrier should be completed as well and not waiting on messages from other nodes
+            getUninterruptibly(localInitiatingBarrier, 1, TimeUnit.SECONDS);
+            cluster.networkFilter.clear();
+
+            Keys globalSyncBarrierKeys = keys(2, 3);
+            // At least one other should have completed by the time it is locally applied, a down node should be fine since it is quorum
+            cluster.networkFilter.isolate(cluster.get(2).id());
+            Barrier<Keys> globalInitiatingBarrier = Barrier.barrier(node, globalSyncBarrierKeys, node.epoch(), BarrierType.global_sync);
+            Timestamp globalBarrierTimestamp = getUninterruptibly(globalInitiatingBarrier);
+            int localBarrierCount = ((TestAgent)agent).completedLocalBarriers.getOrDefault(globalBarrierTimestamp, new AtomicInteger(0)).get();
+            assertNotNull(globalInitiatingBarrier.coordinateSyncPoint);
+            assertEquals(2, localBarrierCount);
+            logger.info("Local barrier count " + localBarrierCount);
+            cluster.networkFilter.clear();
+
+            // The existing barrier should suffice here
+            Barrier<Keys> nonInitiatingLocalBarrier = Barrier.barrier(node, Keys.of(key(2)), node.epoch(), BarrierType.local);
+            Timestamp previousBarrierTimestamp = getUninterruptibly(nonInitiatingLocalBarrier);
+            assertNull(nonInitiatingLocalBarrier.coordinateSyncPoint);
+            assertEquals(previousBarrierTimestamp, getUninterruptibly(nonInitiatingLocalBarrier));
+            assertEquals(previousBarrierTimestamp, globalBarrierTimestamp);
+
+            // Sync over nothing should work
+            SyncPoint<Ranges> syncPoint = getUninterruptibly(getUninterruptibly(CoordinateSyncPoint.inclusive(node, ranges(range(99, 100)), false)));
+            assertEquals(node.epoch(), syncPoint.syncId.epoch());
+
+            // Keys and so on for the upcoming transaction pair
+            Keys keys = keys(1);
+            accord.api.Key key = keys.get(0);
+            RoutingKey homeKey = key.toUnseekable();
+            Ranges ranges = ranges(homeKey.asRange());
+            FullKeyRoute route = keys.toRoute(homeKey);
+            TxnId txnId = node.nextTxnId(Write, Key);
+
+            // Create a txn to block the one we are about to create after this
+            SimpleProgressLog.PAUSE_FOR_TEST = true;
+            TxnId blockingTxnId = new TxnId(txnId.epoch(), 1, Read, Key, new Id(1));
+            Txn blockingTxn = agent.emptyTxn(Read, keys);
+            PreLoadContext blockingTxnContext = PreLoadContext.contextFor(blockingTxnId, keys);
+            for (Node n : cluster)
+                assertEquals(AcceptOutcome.Success, getUninterruptibly(n.unsafeForKey(key).submit(blockingTxnContext, store ->
+                        Commands.preaccept(store, store.get(blockingTxnId, homeKey), blockingTxnId, blockingTxnId.epoch(), blockingTxn.slice(store.ranges().allAt(blockingTxnId), true), route, null))));
+
+            // Now create the transaction that should be blocked by the previous one
+            Txn txn = agent.emptyTxn(Write, keys);
+            PreLoadContext context = PreLoadContext.contextFor(txnId, keys);
+            for (Node n : cluster)
+                assertEquals(AcceptOutcome.Success, getUninterruptibly(n.unsafeForKey(key).submit(context, store ->
+                    Commands.preaccept(store, store.get(txnId, homeKey), txnId, txnId.epoch(), txn.slice(store.ranges().allAt(txnId.epoch()), true), route, null))));
+
+
+            CoordinateSyncPoint<Ranges> syncInclusiveSyncFuture = getUninterruptibly(CoordinateSyncPoint.inclusive(node, ranges, false));
+            // Shouldn't complete because it is blocked waiting for the dependency just created to apply
+            sleep(500);
+            assertFalse(syncInclusiveSyncFuture.isDone());
+
+            // Async sync should return a result immediately since we are going to wait on the sync point transaction that was created by the sync point
+            CoordinateSyncPoint<Ranges> asyncInclusiveSyncFuture = getUninterruptibly(CoordinateSyncPoint.inclusive(node, ranges, true));
+            SyncPoint<Ranges> localSyncPoint = getUninterruptibly(asyncInclusiveSyncFuture);
+            Semaphore localSyncOccurred = new Semaphore(0);
+            node.commandStores().ifLocal(PreLoadContext.contextFor(localSyncPoint.syncId), homeKey, epoch, epoch, safeStore ->
+                safeStore.get(localSyncPoint.syncId, homeKey.toUnseekable()).addAndInvokeListener(
+                    safeStore,
+                    commandListener((safeStore2, command) -> {
+                        if (command.current().hasBeen(Applied))
+                            localSyncOccurred.release();
+                    })
+                )
+            ).begin(agent);
+
+            // Move to preapplied in order to test that Barrier will find the transaction and add a listener
+            for (Node n : cluster)
+                getUninterruptibly(n.unsafeForKey(key).execute(context, store ->  {
+                    SafeCommand safeCommand = store.get(txnId, homeKey);
+                    Command command = safeCommand.current();
+                    PartialDeps.Builder depsBuilder = PartialDeps.builder(store.ranges().currentRanges());
+                    depsBuilder.add(key, blockingTxnId);
+                    PartialDeps partialDeps = depsBuilder.build();
+                    Commands.commit(store, safeCommand, txnId, route, null, command.partialTxn(), txnId, partialDeps);
+                    Commands.apply(store, safeCommand, txnId, route, null, txnId, partialDeps, command.partialTxn(), txn.execute(txnId, txnId, null), txn.query().compute(txnId, txnId, keys, null, null, null));
+                }));
+
+            Barrier<Keys> listeningLocalBarrier = Barrier.barrier(node, Keys.of(key), node.epoch(), BarrierType.local);
+            // Wait and make sure the existing transaction check worked and there is no coordinate sync point created
+            Thread.sleep(500);
+            assertNull(listeningLocalBarrier.coordinateSyncPoint);
+            assertNotNull(listeningLocalBarrier.existingTransactionCheck);
+            assertEquals(txnId, getUninterruptibly(listeningLocalBarrier.existingTransactionCheck).executeAt);
+            assertFalse(listeningLocalBarrier.isDone());
+
+            // Apply the blockingTxn to unblock the rest
+            for (Node n : cluster)
+                assertEquals(ApplyOutcome.Success, getUninterruptibly(n.unsafeForKey(key).submit(blockingTxnContext, store -> {
+                    return Commands.apply(store, store.get(blockingTxnId, homeKey), blockingTxnId, route, null, blockingTxnId, PartialDeps.builder(store.ranges().allAt(blockingTxnId.epoch())).build(), blockingTxn.slice(store.ranges().allAt(blockingTxnId.epoch()), true), blockingTxn.execute(blockingTxnId, blockingTxnId, null), blockingTxn.query().compute(blockingTxnId, blockingTxnId, keys, null, null, null));
+                })));
+            // Global sync should be unblocked
+            syncPoint = getUninterruptibly(syncInclusiveSyncFuture);
+            assertEquals(node.epoch(), syncPoint.syncId.epoch());
+            // Command listener for local sync transaction should get notified
+            assertTrue(localSyncOccurred.tryAcquire(5, TimeUnit.SECONDS));
+            // Listening local barrier should have succeeded in waiting on the local transaction that just applied
+            assertEquals(getUninterruptibly(listeningLocalBarrier), getUninterruptibly(listeningLocalBarrier.existingTransactionCheck).executeAt);
+            assertEquals(txnId, getUninterruptibly(listeningLocalBarrier));
+        }
+        finally
+        {
+            SimpleProgressLog.PAUSE_FOR_TEST = false;
+        }
+    }
+
+    @Test
     void slowPathTest() throws Throwable
     {
         try (MockCluster cluster = MockCluster.builder().nodes(7).replication(7).build())
@@ -120,11 +310,11 @@
             cluster.networkFilter.isolate(ids(5, 7));
 
             Node node = cluster.get(1);
-            Assertions.assertNotNull(node);
+            assertNotNull(node);
 
             Txn txn = writeTxn(keys(10));
             Result result = getUninterruptibly(cluster.get(id(1)).coordinate(txn));
-            Assertions.assertEquals(MockStore.RESULT, result);
+            assertEquals(MockStore.RESULT, result);
         }
     }
 
@@ -134,7 +324,7 @@
         txnId = new TxnId(txnId.epoch(), txnId.hlc() + clock, Write, Key, txnId.node);
         Txn txn = writeTxn(keys);
         Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn, node.computeRoute(txnId, txn.keys())));
-        Assertions.assertEquals(MockStore.RESULT, result);
+        assertEquals(MockStore.RESULT, result);
         return txnId;
     }
 
@@ -144,7 +334,7 @@
         try (MockCluster cluster = MockCluster.builder().nodes(6).maxKey(600).build())
         {
             Node node = cluster.get(1);
-            Assertions.assertNotNull(node);
+            assertNotNull(node);
 
             TxnId txnId1 = coordinate(node, 100, keys(50, 350, 550));
             TxnId txnId2 = coordinate(node, 150, keys(250, 350, 450));
@@ -160,12 +350,12 @@
             cluster.networkFilter.isolate(ids(5, 7));
 
             Node node = cluster.get(1);
-            Assertions.assertNotNull(node);
+            assertNotNull(node);
 
             Keys keys = keys(10);
             Txn txn = new Txn.InMemory(keys, MockStore.read(Keys.EMPTY), MockStore.QUERY, MockStore.update(keys));
             Result result = getUninterruptibly(cluster.get(id(1)).coordinate(txn));
-            Assertions.assertEquals(MockStore.RESULT, result);
+            assertEquals(MockStore.RESULT, result);
         }
     }
 
@@ -177,12 +367,12 @@
             cluster.networkFilter.isolate(ids(5, 7));
 
             Node node = cluster.get(1);
-            Assertions.assertNotNull(node);
+            assertNotNull(node);
 
             Keys keys = keys(10);
             Txn txn = new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY, MockStore.update(Keys.EMPTY));
             Result result = getUninterruptibly(cluster.get(id(1)).coordinate(txn));
-            Assertions.assertEquals(MockStore.RESULT, result);
+            assertEquals(MockStore.RESULT, result);
         }
     }
 
@@ -192,18 +382,36 @@
         try (MockCluster cluster = MockCluster.builder().build())
         {
             Node node = cluster.get(1);
-            Assertions.assertNotNull(node);
+            assertNotNull(node);
 
             TxnId txnId = node.nextTxnId(Write, Key);
             Keys oneKey = keys(10);
             Keys twoKeys = keys(10, 20);
             Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), MockStore.QUERY, MockStore.update(twoKeys));
             Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn, txn.keys().toRoute(oneKey.get(0).toUnseekable())));
-            Assertions.assertEquals(MockStore.RESULT, result);
+            assertEquals(MockStore.RESULT, result);
 
             txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), MockStore.QUERY, MockStore.update(Keys.EMPTY));
             result = getUninterruptibly(cluster.get(id(1)).coordinate(txn));
-            Assertions.assertEquals(MockStore.RESULT, result);
+            assertEquals(MockStore.RESULT, result);
         }
     }
+
+    private static Command.TransientListener commandListener(BiConsumer<SafeCommandStore, SafeCommand> listener)
+    {
+        return new TransientListener()
+        {
+            @Override
+            public void onChange(SafeCommandStore safeStore, SafeCommand command)
+            {
+                listener.accept(safeStore, command);
+            }
+
+            @Override
+            public PreLoadContext listenerPreLoadContext(TxnId caller)
+            {
+                return PreLoadContext.contextFor(caller);
+            }
+        };
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java b/accord-core/src/test/java/accord/impl/TestAgent.java
index 908108d..d3e4832 100644
--- a/accord-core/src/test/java/accord/impl/TestAgent.java
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -18,17 +18,33 @@
 
 package accord.impl;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Agent;
+import accord.api.Result;
 import accord.impl.mock.MockStore;
 import accord.local.Command;
 import accord.local.Node;
-import accord.api.Agent;
-import accord.api.Result;
-import accord.primitives.*;
-
-import java.util.concurrent.TimeUnit;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 
 public class TestAgent implements Agent
 {
+    private static final Logger logger = LoggerFactory.getLogger(TestAgent.class);
+
+    public static final ConcurrentMap<Timestamp, AtomicInteger> completedLocalBarriers = new ConcurrentHashMap<>();
+
     public static class RethrowAgent extends TestAgent
     {
         @Override
@@ -82,6 +98,7 @@
     @Override
     public void onUncaughtException(Throwable t)
     {
+        logger.error("Uncaught exception", t);
     }
 
     @Override
@@ -100,4 +117,9 @@
     {
         return new Txn.InMemory(kind, keysOrRanges, MockStore.read(Keys.EMPTY), MockStore.QUERY, null);
     }
+
+    @Override
+    public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull Timestamp executeAt) {
+        completedLocalBarriers.computeIfAbsent(executeAt, ignored -> new AtomicInteger()).incrementAndGet();
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index 239d355..9b90ddf 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -20,17 +20,19 @@
 
 import java.util.Map;
 
-import accord.api.Read;
-import accord.api.Update;
-import accord.local.Node.Id;
 import accord.api.Data;
 import accord.api.Key;
 import accord.api.Query;
+import accord.api.Read;
 import accord.api.Result;
+import accord.api.Update;
+import accord.local.Node.Id;
 import accord.primitives.Keys;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.Timestamped;
+import javax.annotation.Nonnull;
 
 public class ListQuery implements Query
 {
@@ -44,7 +46,7 @@
     }
 
     @Override
-    public Result compute(TxnId txnId, Timestamp executeAt, Data data, Read untypedRead, Update update)
+    public Result compute(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Seekables<?, ?> keys, Data data, Read untypedRead, Update update)
     {
         if (data == null)
             return new ListResult(client, requestId, txnId, Keys.EMPTY, Keys.EMPTY, new int[0][0], (ListUpdate) update);
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index 8fa17d7..bae14b5 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -43,7 +43,7 @@
     };
 
     public static final Result RESULT = new Result() {};
-    public static final Query QUERY = (txnId, executeAt, data, read, update) -> RESULT;
+    public static final Query QUERY = (txnId, executeAt, keys, data, read, update) -> RESULT;
     public static final Write WRITE = (key, commandStore, executeAt, store, command) -> Writes.SUCCESS;
 
     public static Read read(Seekables<?, ?> keys)
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
index cf0621b..e75e3d9 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
@@ -19,15 +19,17 @@
 package accord.maelstrom;
 
 import java.util.Map;
+import javax.annotation.Nonnull;
 
-import accord.api.Read;
-import accord.api.Update;
-import accord.local.Node;
-import accord.local.Node.Id;
 import accord.api.Data;
 import accord.api.Key;
 import accord.api.Query;
+import accord.api.Read;
 import accord.api.Result;
+import accord.api.Update;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 
@@ -43,7 +45,7 @@
     }
 
     @Override
-    public Result compute(TxnId txnId, Timestamp executeAt, Data data, Read untypedRead, Update update)
+    public Result compute(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Seekables<?, ?> keys,  Data data, Read untypedRead, Update update)
     {
         MaelstromRead read = (MaelstromRead) untypedRead;
         Value[] values = new Value[read.readKeys.size()];