Split AsyncChain and AsyncResult; normalise AsyncResult with C* Future

patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20906
diff --git a/.gitmodules b/.gitmodules
index a0aed84..616dacf 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
 	path = modules/accord
-	url = https://github.com/belliottsmith/cassandra-accord.git
-	branch = 20905-ssverifier
+	url = https://github.com/apache/cassandra-accord.git
+	branch = trunk
diff --git a/modules/accord b/modules/accord
index ff4119f..78b84b0 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit ff4119fa45ad9700cfa94cd50b5cd4af9dd0b7a4
+Subproject commit 78b84b08e13530722cb785a3b748fd2075c1c449
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
index 9bf1595..9705fab 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
@@ -229,7 +229,7 @@
             long deadlineNanos = startedAtNanos + timeoutNanos;
             // TODO (expected): use the source bounds for the streams to avoid waiting unnecessarily long
             AccordService.getBlocking(accordService.maxConflict(accordRanges)
-                                                   .flatMap(min -> accordService.sync("[Stream #" + session.planId() + ']', min, accordRanges, null, Self, NoRemote, timeoutNanos, NANOSECONDS))
+                                                   .flatMap(min -> accordService.sync("[Stream #" + session.planId() + ']', min, accordRanges, null, Self, NoRemote, timeoutNanos, NANOSECONDS).chain())
                                       , accordRanges, new LatencyRequestBookkeeping(cfs.metric.accordPostStreamRepair), startedAtNanos, deadlineNanos);
         }
 
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index e69891c..87029aa 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -146,7 +146,6 @@
 import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
 import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
 import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
-import static accord.utils.async.AsyncChains.getBlockingAndRethrow;
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -368,14 +367,14 @@
             List<Entry> cfks = new CopyOnWriteArrayList<>();
             PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query");
             CommandStores commandStores = AccordService.instance().node().commandStores();
-            getBlockingAndRethrow(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
+            AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
                 SafeCommandsForKey safeCfk = safeStore.get(key);
                 CommandsForKey cfk = safeCfk.current();
                 if (cfk == null)
                     return;
 
                 cfks.add(new Entry(safeStore.commandStore().id(), cfk));
-            }).beginAsResult());
+            }));
 
             if (cfks.isEmpty())
                 return null;
@@ -478,7 +477,7 @@
             List<Entry> cfks = new CopyOnWriteArrayList<>();
             PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table query");
             CommandStores commandStores = AccordService.instance().node().commandStores();
-            getBlockingAndRethrow(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
+            AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
                 SafeCommandsForKey safeCfk = safeStore.get(key);
                 CommandsForKey cfk = safeCfk.current();
                 if (cfk == null)
@@ -1469,7 +1468,7 @@
                 {
                     consumer.accept(command.route(), result);
                 }
-                return result;
+                return result.chain();
             });
         }
 
@@ -1504,12 +1503,10 @@
         private void run(TxnId txnId, int commandStoreId, Function<SafeCommandStore, AsyncChain<Void>> apply)
         {
             AccordService accord = (AccordService) AccordService.instance();
-            AsyncChains.awaitUninterruptibly(accord.node()
-                                                   .commandStores()
-                                                   .forId(commandStoreId)
-                                                   .submit(PreLoadContext.contextFor(txnId, TXN_OPS), apply)
-                                                   .flatMap(i -> i)
-                                                   .beginAsResult());
+            AccordService.getBlocking(accord.node()
+                                            .commandStores()
+                                            .forId(commandStoreId)
+                                            .chain(PreLoadContext.contextFor(txnId, TXN_OPS), apply));
         }
 
         private void cleanup(TxnId txnId, int commandStoreId, Cleanup cleanup)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 5a73e8a..5513805 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -22,7 +22,6 @@
 import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -38,6 +37,7 @@
 import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
+import accord.api.AsyncExecutor;
 import accord.api.DataStore;
 import accord.api.Journal;
 import accord.api.LocalListeners;
@@ -247,7 +247,7 @@
     // TODO (desired): we use this for executing callbacks with mutual exclusivity,
     //  but we don't need to block the actual CommandStore - could quite easily
     //  inflate a separate queue dynamically in AccordExecutor
-    public Executor taskExecutor()
+    public AsyncExecutor taskExecutor()
     {
         return exclusiveExecutor;
     }
@@ -336,25 +336,30 @@
         return lastSystemTimestampMicros;
     }
     @Override
-    public <T> AsyncChain<T> build(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
+    public <T> AsyncChain<T> chain(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
     {
         return AccordTask.create(this, loadCtx, function).chain();
     }
 
     @Override
-    public <T> AsyncChain<T> build(Callable<T> task)
-    {
-        return AsyncChains.ofCallable(taskExecutor(), task);
-    }
-
-    @Override
-    public AsyncChain<Void> build(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer)
+    public AsyncChain<Void> chain(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer)
     {
         return AccordTask.create(this, preLoadContext, consumer).chain();
     }
 
-    public AccordSafeCommandStore begin(AccordTask<?> operation,
-                                        @Nullable CommandsForRanges commandsForRanges)
+    @Override
+    public <T> AsyncChain<T> chain(Callable<T> call)
+    {
+        return taskExecutor().chain(call);
+    }
+
+    @Override
+    public void execute(Runnable run)
+    {
+        taskExecutor().execute(run);
+    }
+
+    public AccordSafeCommandStore begin(AccordTask<?> operation, @Nullable CommandsForRanges commandsForRanges)
     {
         require(current == null);
         current = AccordSafeCommandStore.create(operation, commandsForRanges, this);
@@ -495,7 +500,7 @@
                 }
             }
 
-            ready.begin((success, fail) -> {
+            ready.invoke((success, fail) -> {
                 if (fail != null)
                 {
                     logger.error("{}: failed to ensure durability of {} ({})", this, ranges, reportId, fail);
@@ -541,7 +546,7 @@
             if (onlyNonDurable && !maybeShouldReplay(txnId))
                 return AsyncChains.success(null);
 
-            return store.submit(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
+            return store.chain(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
                 if (onlyNonDurable && !shouldReplay(txnId, safeStore.unsafeGet(txnId).current().participants()))
                     return null;
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 0049e71..5dc0d19 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -652,7 +652,7 @@
     public Future<Void> unsafeLocalSyncNotified(long epoch)
     {
         AsyncPromise<Void> promise = new AsyncPromise<>();
-        getOrCreateEpochState(epoch).localSyncNotified().begin((result, failure) -> {
+        getOrCreateEpochState(epoch).localSyncNotified().invoke((result, failure) -> {
             if (failure != null) promise.tryFailure(failure);
             else promise.trySuccess(result);
         });
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index b026168..3f91882 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -56,6 +56,10 @@
 import accord.utils.TriConsumer;
 import accord.utils.TriFunction;
 import accord.utils.UnhandledEnum;
+import accord.utils.async.AsyncCallbacks.CallAndCallback;
+import accord.utils.async.AsyncCallbacks.FlatCallAndCallback;
+import accord.utils.async.AsyncCallbacks.RunAndCallback;
+import accord.utils.async.AsyncCallbacks.RunOrFail;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.Cancellable;
@@ -430,16 +434,10 @@
     }
 
     @Override
-    public <T> AsyncChain<T> build(Callable<T> task)
+    public Cancellable execute(RunOrFail runOrFail)
     {
-        return new AsyncChains.Head<>()
-        {
-            @Override
-            protected Cancellable start(BiConsumer<? super T, Throwable> callback)
-            {
-                return submit(new PlainChain<>(task, callback, null));
-            }
-        };
+        PlainChain submit = new PlainChain(runOrFail);
+        return submit(submit);
     }
 
     public <T> AsyncChain<T> buildDebuggable(Callable<T> task, Object describe)
@@ -449,7 +447,7 @@
             @Override
             protected Cancellable start(BiConsumer<? super T, Throwable> callback)
             {
-                return submit(new DebuggableChain<>(task, callback, null, describe));
+                return submit(new DebuggableChain(new CallAndCallback<>(task, callback), null, 0, describe));
             }
         };
     }
@@ -581,7 +579,7 @@
 
     void submitExclusive(Runnable runnable)
     {
-        submitPlainExclusive(new PlainRunnable(null, runnable, null));
+        submitPlainExclusive(new PlainRunnable(null, runnable));
     }
 
     private void submitPlainExclusive(Plain task)
@@ -774,14 +772,14 @@
 
     public Future<?> submit(Runnable run)
     {
-        PlainRunnable task = new PlainRunnable(new AsyncPromise<>(), run, null);
+        PlainRunnable task = new PlainRunnable(new AsyncPromise<>(), run);
         submit(task);
         return task.result;
     }
 
     public void execute(Runnable command)
     {
-        submit(new PlainRunnable(null, command, null));
+        submit(new PlainRunnable(null, command));
     }
 
     private Cancellable submit(Plain task)
@@ -804,11 +802,6 @@
         }
     }
 
-    public void execute(Runnable command, AccordCommandStore commandStore)
-    {
-        submit(new PlainRunnable(null, command, commandStore.exclusiveExecutor));
-    }
-
     @Override
     public void setCapacity(long bytes)
     {
@@ -1107,53 +1100,91 @@
         }
 
         @Override
-        public <T> AsyncChain<T> build(Callable<T> call)
+        public AsyncChain<Void> chain(Runnable run)
         {
-            int position = inExecutor() && task != null ? task.queuePosition : 0;
+            int position = inheritQueuePosition();
             return new AsyncChains.Head<>()
             {
                 @Override
-                protected Cancellable start(BiConsumer<? super T, Throwable> callback)
+                protected Cancellable start(BiConsumer<? super Void, Throwable> callback)
                 {
-                    PlainChain<T> submit = new PlainChain<>(call, callback, SequentialExecutor.this);
-                    submit.queuePosition = position;
-                    return AccordExecutor.this.submit(submit);
+                    return execute(new RunAndCallback(run, callback), position);
                 }
             };
         }
 
         @Override
+        public <T> AsyncChain<T> chain(Callable<T> call)
+        {
+            int position = inheritQueuePosition();
+            return new AsyncChains.Head<>()
+            {
+                @Override
+                protected Cancellable start(BiConsumer<? super T, Throwable> callback)
+                {
+                    return execute(new CallAndCallback<>(call, callback), position);
+                }
+            };
+        }
+
+        @Override
+        public <T> AsyncChain<T> flatChain(Callable<? extends AsyncChain<T>> call)
+        {
+            int position = inheritQueuePosition();
+            return new AsyncChains.Head<>()
+            {
+                @Override
+                protected Cancellable start(BiConsumer<? super T, Throwable> callback)
+                {
+                    return execute(new FlatCallAndCallback<>(call, callback), position);
+                }
+            };
+        }
+
+        @Override
+        public Cancellable execute(RunOrFail runOrFail)
+        {
+            return execute(runOrFail, inheritQueuePosition());
+        }
+
+        private int inheritQueuePosition()
+        {
+            return inExecutor() && task != null ? task.queuePosition : 0;
+        }
+
+        private Cancellable execute(RunOrFail runOrFail, int queuePosition)
+        {
+            PlainChain submit = new PlainChain(runOrFail, SequentialExecutor.this, queuePosition);
+            return AccordExecutor.this.submit(submit);
+        }
+
+        @Override
         public void execute(Runnable run)
         {
-            PlainRunnable submit = new PlainRunnable(null, run, this);
-            if (inExecutor() && this.task != null)
-                submit.queuePosition = this.task.queuePosition;
+            PlainRunnable submit = new PlainRunnable(null, run, this, inheritQueuePosition());
             AccordExecutor.this.submit(submit);
         }
 
         @Override
-        public void maybeExecuteImmediately(Runnable run)
+        public boolean tryExecuteImmediately(Runnable run)
         {
             Thread self = Thread.currentThread();
             Thread owner = this.owner;
-            if (owner == self || (owner == null && ownerUpdater.compareAndSet(this, null, self)))
+            if (owner != null ? owner != self : !ownerUpdater.compareAndSet(this, null, self))
+                return false;
+
+            try { run.run(); }
+            catch (Throwable t) { agent.onUncaughtException(t); }
+            finally
             {
-                try { run.run(); }
-                catch (Throwable t) { agent.onUncaughtException(t); }
-                finally
+                if (owner == null)
                 {
-                    if (owner == null)
-                    {
-                        this.owner = null;
-                        if (waiting != null)
-                            LockSupport.unpark(waiting);
-                    }
+                    this.owner = null;
+                    if (waiting != null)
+                        LockSupport.unpark(waiting);
                 }
             }
-            else
-            {
-                execute(run);
-            }
+            return true;
         }
     }
 
@@ -1287,14 +1318,20 @@
 
         PlainRunnable(Runnable run)
         {
-            this(null, run, null);
+            this(null, run);
         }
 
-        PlainRunnable(AsyncPromise<Void> result, Runnable run, @Nullable SequentialExecutor executor)
+        PlainRunnable(AsyncPromise<Void> result, Runnable run)
+        {
+            this(result, run, null, 0);
+        }
+
+        PlainRunnable(AsyncPromise<Void> result, Runnable run, @Nullable SequentialExecutor executor, int queuePosition)
         {
             this.result = result;
             this.run = run;
             this.executor = executor;
+            this.queuePosition = queuePosition;
         }
 
         @Override
@@ -1514,17 +1551,21 @@
         }
     }
 
-    class PlainChain<T> extends Plain
+    class PlainChain extends Plain
     {
-        final Callable<T> call;
-        final BiConsumer<? super T, Throwable> callback;
+        final RunOrFail runOrFail;
         final @Nullable SequentialExecutor executor;
 
-        PlainChain(Callable<T> call, BiConsumer<? super T, Throwable> callback, @Nullable SequentialExecutor executor)
+        PlainChain(RunOrFail runOrFail)
         {
-            this.call = call;
-            this.callback = callback;
+            this(runOrFail, null, 0);
+        }
+
+        PlainChain(RunOrFail runOrFail, @Nullable SequentialExecutor executor, int queuePosition)
+        {
+            this.runOrFail = runOrFail;
             this.executor = executor;
+            this.queuePosition = queuePosition;
         }
 
         @Override
@@ -1536,23 +1577,14 @@
         @Override
         protected void runInternal()
         {
-            T success;
             try (Closeable close = locals.get())
             {
-                success = call.call();
-            }
-            catch (Throwable t)
-            {
-                fail(t);
-                return;
-            }
-            try
-            {
-                callback.accept(success, null);
+                runOrFail.run();
             }
             catch (Throwable t)
             {
                 agent.onUncaughtException(t);
+                return;
             }
         }
 
@@ -1561,7 +1593,7 @@
         {
             try
             {
-                callback.accept(null, fail);
+                runOrFail.fail(fail);
             }
             catch (Throwable t)
             {
@@ -1571,15 +1603,15 @@
         }
     }
 
-    class DebuggableChain<T> extends PlainChain<T> implements DebuggableTask
+    class DebuggableChain extends PlainChain implements DebuggableTask
     {
         final long createdAtNanos;
         long startedAtNanos;
         final Object describe;
 
-        DebuggableChain(Callable<T> call, BiConsumer<? super T, Throwable> callback, @Nullable SequentialExecutor executor, Object describe)
+        DebuggableChain(RunOrFail runOrFail, @Nullable SequentialExecutor executor, int queuePosition, Object describe)
         {
-            super(call, callback, executor);
+            super(runOrFail, executor, queuePosition);
             this.createdAtNanos = MonotonicClock.Global.approxTime.now();
             this.describe = Invariants.nonNull(describe);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 96b9336..058b917 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import javax.annotation.Nonnull;
@@ -41,6 +42,7 @@
 import org.apache.cassandra.metrics.AccordReplicaMetrics;
 import org.apache.cassandra.service.accord.api.AccordViolationHandler;
 import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,7 +135,6 @@
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.Promise;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static accord.api.Journal.TopologyUpdate;
@@ -553,7 +554,7 @@
     }
 
     @Override
-    public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
+    public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
     {
         return node.durability().sync(requestedBy, ExclusiveSyncPoint, minBound, ranges, include, syncLocal, syncRemote, timeout, timeoutUnits);
     }
@@ -580,8 +581,8 @@
         return node.withEpochAtLeast(txnId.epoch(), null, () -> {
             Txn txn = new Txn.InMemory(Write, keys, TxnRead.createNoOpRead(keys), TxnQuery.UNSAFE_EMPTY, TxnUpdate.empty(), new TableMetadatasAndKeys(TableMetadatas.none(), keys));
             return CoordinateTransaction.coordinate(node, route, txnId, txn)
-                                        .map(ignore -> (Void) null).beginAsResult();
-        }).beginAsResult();
+                                        .mapToNull();
+        });
     }
 
     @Override
@@ -590,6 +591,53 @@
         return CoordinateMaxConflict.maxConflict(node, ranges);
     }
 
+    static class AsyncFutureCallback<V> extends AsyncFuture<V> implements BiConsumer<V, Throwable>
+    {
+        @Override
+        public void accept(V v, Throwable fail)
+        {
+            if (fail == null) trySuccess(v);
+            else tryFailure(fail);
+        }
+    }
+
+    public static <V> Future<V> toFuture(AsyncChain<V> chain)
+    {
+        AsyncFutureCallback<V> future = new AsyncFutureCallback<>();
+        chain.begin(future);
+        return future;
+    }
+
+    public static <V> Future<V> toFuture(AsyncResult<V> result)
+    {
+        if (result instanceof Future<?>)
+            return (Future<V>) result;
+
+        AsyncPromise<V> promise = new AsyncPromise<>();
+        result.invoke((success, failure) -> {
+            if (failure == null) promise.trySuccess(success);
+            else promise.tryFailure(failure);
+        });
+        return promise;
+    }
+
+    public static <V> V getBlocking(AsyncChain<V> async)
+    {
+        return syncAndRethrow(toFuture(async)).getNow();
+    }
+
+    public static <V> V getBlocking(AsyncResult<V> async)
+    {
+        return syncAndRethrow(toFuture(async)).getNow();
+    }
+
+    private static <V> Future<V> syncAndRethrow(Future<V> future)
+    {
+        future.syncThrowUncheckedOnInterrupt()
+              .rethrowIfFailed();
+        return future;
+    }
+
     public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest)
     {
         return getBlocking(async, null, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest);
@@ -602,26 +650,23 @@
         return result.awaitAndGet();
     }
 
+    public static <V> V getBlocking(AsyncResult<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest)
+    {
+        return getBlocking(async, null, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest);
+    }
+
+    public static <V> V getBlocking(AsyncResult<V> async, @Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest)
+    {
+        AccordResult<V> result = new AccordResult<>(txnId, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest);
+        async.invoke(result);
+        return result.awaitAndGet();
+    }
+
     public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline)
     {
         return getBlocking(async, keysOrRanges, bookkeeping, startedAt, deadline, false);
     }
 
-    public static <V> V getBlocking(AsyncChain<V> async)
-    {
-        return asPromise(async).syncUninterruptibly().getNow();
-    }
-
-    public static <V> Promise<V> asPromise(AsyncChain<V> async)
-    {
-        AsyncPromise<V> promise = new AsyncPromise<>();
-        async.begin((result, failure) -> {
-            if (failure == null) promise.trySuccess(result);
-            else promise.tryFailure(failure);
-        });
-        return promise;
-    }
-
     public static Keys intersecting(Keys keys)
     {
         if (keys.isEmpty())
@@ -719,7 +764,7 @@
         bookkeeping.metrics.keySize.update(txn.keys().size());
         long deadlineNanos = requestTime.computeDeadline(timeout);
         AccordResult<TxnResult> result = new AccordResult<>(txnId, txn.keys(), bookkeeping, requestTime.startedAtNanos(), deadlineNanos, true);
-        ((AsyncResult)node.coordinate(txnId, txn, minEpoch, deadlineNanos)).begin(result);
+        node.coordinate(txnId, txn, minEpoch, deadlineNanos).begin((BiConsumer) result);
         return result;
     }
 
@@ -754,7 +799,7 @@
         }
         Ready ready = new Ready();
         AccordCommandStores commandStores = (AccordCommandStores) node.commandStores();
-        AsyncChains.getBlockingAndRethrow(commandStores.forEach((PreLoadContext.Empty)() -> "Flush Caches", safeStore -> {
+        getBlocking(commandStores.forEach((PreLoadContext.Empty)() -> "Flush Caches", safeStore -> {
             AccordCommandStore commandStore = (AccordCommandStore)safeStore.commandStore();
             try (AccordCommandStore.ExclusiveCaches caches = commandStore.lockCaches())
             {
@@ -769,7 +814,7 @@
         }));
         ready.decrement();
         AsyncPromise<Void> result = new AsyncPromise<>();
-        ready.begin((success, fail) -> {
+        ready.invoke((success, fail) -> {
             if (fail != null) result.tryFailure(fail);
             else result.trySuccess(null);
         });
@@ -839,19 +884,7 @@
     @Override
     public List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId txnId)
     {
-        AsyncChain<List<CommandStoreTxnBlockedGraph>> states = loadDebug(txnId);
-        try
-        {
-            return AsyncChains.getBlocking(states);
-        }
-        catch (InterruptedException e)
-        {
-            throw new UncheckedInterruptedException(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e.getCause());
-        }
+        return getBlocking(loadDebug(txnId));
     }
 
     public AsyncChain<List<CommandStoreTxnBlockedGraph>> loadDebug(TxnId original)
@@ -862,11 +895,11 @@
         int[] ids = commandStores.ids();
         List<AsyncChain<CommandStoreTxnBlockedGraph>> chains = new ArrayList<>(ids.length);
         for (int id : ids)
-            chains.add(loadDebug(original, commandStores.forId(id)));
+            chains.add(loadDebug(original, commandStores.forId(id)).chain());
         return AsyncChains.allOf(chains);
     }
 
-    private AsyncChain<CommandStoreTxnBlockedGraph> loadDebug(TxnId txnId, CommandStore store)
+    private AsyncResult<CommandStoreTxnBlockedGraph> loadDebug(TxnId txnId, CommandStore store)
     {
         CommandStoreTxnBlockedGraph.Builder state = new CommandStoreTxnBlockedGraph.Builder(store.id());
         populateAsync(state, store, txnId);
@@ -995,7 +1028,7 @@
     @Override
     public void ensureMinHlc(long minHlc)
     {
-        asPromise(node.updateMinHlc(minHlc >= 0 ? minHlc : 0)).syncUninterruptibly();
+        toFuture(node.updateMinHlc(minHlc >= 0 ? minHlc : 0)).syncUninterruptibly();
     }
 
     public AccordJournal journal()
@@ -1006,7 +1039,7 @@
     @Override
     public Future<Void> epochReady(Epoch epoch)
     {
-        return asPromise(configService.epochReady(epoch.getEpoch()));
+        return toFuture(configService.epochReady(epoch.getEpoch()));
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index c6e754b..34453d2 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -28,6 +28,7 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import accord.utils.async.AsyncResult;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@
     IVerbHandler<? extends Request> requestHandler();
     IVerbHandler<? extends Reply> responseHandler();
 
-    AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits);
+    AsyncResult<Void> sync(Object requestedBy, @Nullable Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits);
     AsyncChain<Void> sync(@Nullable Timestamp minBound, Keys keys, SyncLocal syncLocal, SyncRemote syncRemote);
     AsyncChain<Timestamp> maxConflict(Ranges ranges);
 
@@ -214,7 +215,7 @@
         }
 
         @Override
-        public AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
+        public AsyncResult<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
         {
             throw new UnsupportedOperationException("No accord transaction should be executed when accord.enabled = false in cassandra.yaml");
         }
@@ -410,7 +411,7 @@
         }
 
         @Override
-        public AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
+        public AsyncResult<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
         {
             return delegate.sync(requestedBy, onOrAfter, ranges, include, syncLocal, syncRemote, timeout, timeoutUnits);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java b/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
index e015a33..7626ee9 100644
--- a/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
@@ -33,11 +33,11 @@
     public static final ImmediateAsyncExecutor INSTANCE = new ImmediateAsyncExecutor();
 
     @Override
-    public <T> AsyncChain<T> build(Callable<T> task)
+    public <T> AsyncChain<T> chain(Callable<T> call)
     {
         try
         {
-            return AsyncChains.success(task.call());
+            return AsyncChains.success(call.call());
         }
         catch (Throwable t)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 9204770..be7153f 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -58,8 +58,7 @@
 import accord.utils.UnhandledEnum;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
+import accord.utils.async.Cancellable;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.RequestTimeoutException;
 import org.apache.cassandra.metrics.AccordCoordinatorMetrics;
@@ -392,9 +391,15 @@
             return AsyncChains.success(staleId);
 
         logger.debug("Waiting {} micros for {} to be stale", waitMicros, staleId);
-        AsyncResult.Settable<TxnId> result = AsyncResults.settable();
-        node.scheduler().once(() -> result.setSuccess(staleId), waitMicros, MICROSECONDS);
-        return result;
+        return new AsyncChains.Head<>()
+        {
+            @Override
+            protected @Nullable Cancellable start(BiConsumer<? super TxnId, Throwable> callback)
+            {
+                node.scheduler().once(() -> callback.accept(staleId, null), waitMicros, MICROSECONDS);
+                return null;
+            }
+        };
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
index 72ac8d7..597294c 100644
--- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
+++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.BiConsumer;
 
+import accord.api.AsyncExecutor;
 import accord.api.Data;
 import accord.api.Result;
 import accord.coordinate.CoordinationAdapter;
@@ -257,7 +258,7 @@
                              }
 
                              Group group = Group.one(command);
-                             results.add(AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+                             results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
                                  TxnData result = new TxnData();
                                  // Enforcing limits is redundant since we only have a group of size 1, but checking anyways
                                  // documents the requirement here
@@ -293,7 +294,7 @@
 
                 // TODO (required): To make migration work we need to validate that the range is all on Accord
 
-                results.add(AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+                results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
                     TxnData result = new TxnData();
                     try (PartitionIterator iterator = StorageProxy.getRangeSlice(command, consistencyLevel, this, requestTime))
                     {
@@ -395,7 +396,7 @@
 
     private AsyncChain<Data> executeUnrecoverableRepairUpdate()
     {
-        return AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+        return AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
             UnrecoverableRepairUpdate repairUpdate = (UnrecoverableRepairUpdate)txn.update();
             // TODO (expected): We should send the read in the same message as the commit. This requires refactor ReadData.Kind so that it doesn't specify the ordinal encoding
             // and can be extended similar to MessageType which allows additional types not from Accord to be added
diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
index 4f94cf2..51f9360 100644
--- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
+++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
@@ -29,6 +29,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
+import accord.api.AsyncExecutor;
 import accord.api.Data;
 import accord.local.Node;
 import accord.local.SafeCommandStore;
@@ -282,7 +283,7 @@
                 return AsyncChains.success(new LocalReadData(new ArrayList<>(), readCommand));
 
             ReadCommand submit = readCommand.withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()), nowInSeconds);
-            return AsyncChains.ofCallable(Stage.READ.executor(), () -> new LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), command));
+            return AsyncExecutor.chain(Stage.READ.executor(), () -> new LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), command));
         }
 
         // This path can have a subrange we have never seen before provided by short read protection or read repair so we need to
@@ -297,7 +298,7 @@
                 continue;
             ReadCommand submit = TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) command, intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds);
             TokenKey routingKey = ((TokenRange)r).start();
-            chains.add(AsyncChains.ofCallable(Stage.READ.executor(), () -> new LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, false), command)));
+            chains.add(AsyncExecutor.chain(Stage.READ.executor(), () -> new LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, false), command)));
         }
 
         if (chains.isEmpty())
diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
index 1a9772f..d080386 100644
--- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
+++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import javax.annotation.Nullable;
 
+import accord.api.AsyncExecutor;
 import accord.api.Data;
 import accord.local.Node;
 import accord.local.SafeCommandStore;
@@ -34,7 +35,6 @@
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadRepairVerbHandler;
 import org.apache.cassandra.db.TypeSizes;
@@ -110,11 +110,8 @@
 
     private static final IVersionedSerializer<Data> noop_data_serializer = new IVersionedSerializer<>()
     {
-        @Override
-        public void serialize(Data t, DataOutputPlus out, Version version) throws IOException {}
-        @Override
-        public Data deserialize(DataInputPlus in, Version version) throws IOException { return Data.NOOP_DATA; }
-
+        @Override public void serialize(Data t, DataOutputPlus out, Version version) {}
+        @Override public Data deserialize(DataInputPlus in, Version version) { return Data.NOOP_DATA; }
         public long serializedSize(Data t, Version version) { return 0; }
     };
 
@@ -148,7 +145,7 @@
     protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Participants<?> execute)
     {
         // TODO (required): subtract unavailable ranges, either from read or from response (or on coordinator)
-        return AsyncChains.ofCallable(Verb.READ_REPAIR_REQ.stage.executor(), () -> {
+        return AsyncExecutor.chain(Verb.READ_REPAIR_REQ.stage.executor(), () -> {
                                           ReadRepairVerbHandler.instance.applyMutation(mutation);
                                           return Data.NOOP_DATA;
                                       });
diff --git a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
index 4cddc7c..996c822 100644
--- a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
+++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
@@ -164,7 +164,7 @@
             long timeoutNanos = getAccordRepairTimeoutNanos();
             long maxHlc = AccordService.getBlocking(service.maxConflict(ranges).flatMap(conflict -> {
                 Timestamp conflictMax = mergeMax(conflict, minForEpoch(this.minEpoch.getEpoch()));
-                return service.sync("[repairId #" + repairId + ']', conflictMax, Ranges.of(range), null, NoLocal, syncRemote, timeoutNanos, NANOSECONDS).map(ignored -> conflictMax.hlc());
+                return service.sync("[repairId #" + repairId + ']', conflictMax, Ranges.of(range), null, NoLocal, syncRemote, timeoutNanos, NANOSECONDS).map(ignored -> conflictMax.hlc()).chain();
             }), ranges, bookkeeping, start, start + timeoutNanos);
             waiting = null;
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 6160232..a9060c5 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -35,7 +35,7 @@
 import accord.primitives.Timestamp;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncResults;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.PartitionRangeReadCommand;
@@ -231,7 +231,7 @@
     {
         ReadCommand command = deserialize(tables);
         if (command == null)
-            return AsyncResults.success(new TxnData());
+            return AsyncChains.success(new TxnData());
 
         // It's fine for our nowInSeconds to lag slightly our insertion timestamp, as to the user
         // this simply looks like the transaction witnessed TTL'd data and the data then expired
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index 83a287b..e8e70d7 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -27,11 +27,11 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.AsyncExecutor;
 import accord.api.Write;
 import accord.local.CommandStore;
 import accord.local.SafeCommandStore;
@@ -138,13 +138,13 @@
                    '}';
         }
 
-        public AsyncChain<Void> write(Executor executor, TableMetadatas tables, boolean preserveTimestamps, long timestamp)
+        public AsyncChain<Void> write(AsyncExecutor executor, TableMetadatas tables, boolean preserveTimestamps, long timestamp)
         {
             PartitionUpdate update = deserialize(tables);
             if (!preserveTimestamps)
                 update = new PartitionUpdate.Builder(update, 0).updateAllTimestamp(timestamp).build();
             Mutation mutation = new Mutation(update, PotentialTxnConflicts.ALLOW);
-            return AsyncChains.ofRunnable(executor, () -> mutation.apply(false, false));
+            return executor.chain(() -> mutation.apply(false, false));
         }
 
         @Override
@@ -479,9 +479,9 @@
             return AsyncChains.success(null);
 
         if (results.size() == 1)
-            return results.get(0).map(o -> null);
+            return results.get(0).mapToNull();
 
-        return AsyncChains.reduce(results, (i1, i2) -> null, (Void)null).map(ignore -> null);
+        return AsyncChains.reduce(results, (i1, i2) -> null, null);
     }
 
     public long estimatedSizeOnHeap()
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
index 7cf577d..dca31cc 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
@@ -34,6 +34,7 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
 
+import accord.utils.async.AsyncResult;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.internal.ThrowableUtil;
 import org.apache.cassandra.utils.concurrent.ListenerList.CallbackBiConsumerListener;
@@ -317,17 +318,6 @@
     }
 
     /**
-     * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
-     *
-     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
-     */
-    @Override
-    public <T> Future<T> map(Function<? super V, ? extends T> mapper)
-    {
-        return map(mapper, null);
-    }
-
-    /**
      * Support more fluid version of {@link com.google.common.util.concurrent.Futures#addCallback}
      *
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
@@ -366,12 +356,12 @@
      *
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
-    protected <T> Future<T> flatMap(AbstractFuture<T> result, Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
+    protected <T> Future<T> flatMap(AbstractFuture<T> result, Function<? super V, ? extends AsyncResult<T>> flatMapper, @Nullable Executor executor)
     {
         addListener(() -> {
             try
             {
-                if (isSuccess()) flatMapper.apply(getNow()).addListener(propagate(result));
+                if (isSuccess()) flatMapper.apply(getNow()).invoke(propagateAsConsumer(result));
                 else result.tryFailure(cause());
             }
             catch (Throwable t)
@@ -404,7 +394,7 @@
         addListener(() -> {
             try
             {
-                if (isSuccess()) andThen.apply(getNow()).addListener(propagate(result));
+                if (isSuccess()) andThen.apply(getNow()).addListener(propagateAsListener(result));
                 else result.tryFailure(cause());
             }
             catch (Throwable t)
@@ -553,11 +543,22 @@
     /**
      * @return a listener that will propagate to {@code to} the result of the future it is invoked with
      */
-    private static <V> GenericFutureListener<? extends Future<V>> propagate(AbstractFuture<? super V> to)
+    private static <V> GenericFutureListener<? extends Future<V>> propagateAsListener(AbstractFuture<? super V> to)
     {
         return from -> {
             if (from.isSuccess()) to.trySuccess(from.getNow());
             else to.tryFailure(from.cause());
         };
     }
+
+    /**
+     * @return a listener that will propagate to {@code to} the result of the future it is invoked with
+     */
+    private static <V> BiConsumer<? super V, Throwable> propagateAsConsumer(AbstractFuture<? super V> to)
+    {
+        return (success, fail) -> {
+            if (fail == null) to.trySuccess(success);
+            else to.tryFailure(fail);
+        };
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
index 3e5052e..e381172 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
@@ -27,6 +27,7 @@
 import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
 
 import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.cassandra.utils.concurrent.ListenerList.Waiting;
 
@@ -146,7 +147,7 @@
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
     @Override
-    public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
+    public <T> Future<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper, @Nullable Executor executor)
     {
         return flatMap(new AsyncFuture<>(), flatMapper, executor);
     }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index c5fc022..b8bf62e 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -27,6 +27,8 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
 
+import accord.api.AsyncExecutor;
+import accord.utils.async.AsyncResult;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import io.netty.util.internal.PlatformDependent;
@@ -41,7 +43,7 @@
  * integrating also with our {@link Awaitable} abstraction, to overall improve coherency and clarity in the codebase.
  */
 @Shared(scope = SIMULATION, ancestors = INTERFACES)
-public interface Future<V> extends io.netty.util.concurrent.Future<V>, ListenableFuture<V>, Awaitable
+public interface Future<V> extends io.netty.util.concurrent.Future<V>, ListenableFuture<V>, Awaitable, AsyncResult<V>
 {
     /**
      * Wait indefinitely for this future to complete, throwing any interrupt
@@ -123,6 +125,8 @@
         return awaitUninterruptibly(l, MILLISECONDS);
     }
 
+    default Future<V> invoke(BiConsumer<? super V, Throwable> callback) { return addCallback(callback); }
+
     /**
      * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively
      */
@@ -158,7 +162,12 @@
      */
     default <T> Future<T> map(Function<? super V, ? extends T> mapper)
     {
-        return map(mapper, null);
+        return map(mapper, (Executor) null);
+    }
+
+    default <T> AsyncResult<T> map(Function<? super V, ? extends T> mapper, AsyncExecutor executor)
+    {
+        return map(mapper, (Executor) executor);
     }
 
     /**
@@ -169,15 +178,20 @@
     /**
      * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
      */
-    default <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper)
+    default <T> Future<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper)
     {
-        return flatMap(flatMapper, null);
+        return flatMap(flatMapper, (Executor) null);
+    }
+
+    default <T> AsyncResult<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper, AsyncExecutor executor)
+    {
+        return flatMap(flatMapper, (Executor) executor);
     }
 
     /**
      * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
      */
-    <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, Executor executor);
+    <T> Future<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper, Executor executor);
 
     /**
      * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
index 287911b..6098f8d 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
@@ -26,6 +26,7 @@
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
 
+import accord.utils.async.AsyncResult;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import static org.apache.cassandra.utils.concurrent.Awaitable.SyncAwaitable.waitUntil;
@@ -106,7 +107,7 @@
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
     @Override
-    public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
+    public <T> Future<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper, @Nullable Executor executor)
     {
         return flatMap(new SyncFuture<>(), flatMapper, executor);
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index 696c351..d30c3d8 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -62,10 +62,10 @@
 import org.apache.cassandra.utils.FBUtilities;
 import org.assertj.core.api.Assertions;
 
-import static accord.utils.async.AsyncChains.awaitUninterruptiblyAndRethrow;
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 
 public class AccordBootstrapTest extends TestBaseImpl
 {
@@ -278,7 +278,7 @@
                         Assert.assertTrue(session.getNumKeyspaceTransfers() > 0);
                     });
 
-                    awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
+                    getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
                         AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
                         Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.bootstrapBeganAt().keySet()));
                         Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.safeToReadAt().keySet()));
@@ -321,7 +321,7 @@
                         Assert.assertEquals(key, row.getInt("c"));
                         Assert.assertEquals(key, row.getInt("v"));
 
-                        awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
+                        getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
                             if (safeStore.ranges().currentRanges().contains(partitionKey))
                             {
                                 AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
@@ -464,7 +464,7 @@
 
                             PartitionKey partitionKey = new PartitionKey(tableId, dk);
 
-                            awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
+                            getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
                                                                                           partitionKey.toUnseekable(), moveMax, moveMax,
                                                                                           safeStore -> {
                                 if (!safeStore.ranges().allAt(preMove).contains(partitionKey))
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java
index 945b8d4..a71e779 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java
@@ -30,7 +30,6 @@
 import accord.primitives.Routable;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
-import accord.utils.async.AsyncChains;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
@@ -48,6 +47,7 @@
 
 import static accord.local.LoadKeysFor.READ_WRITE;
 import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 
 public class AccordDropTableBase extends TestBaseImpl
 {
@@ -138,7 +138,7 @@
                 for (int storeId : stores.ids())
                 {
                     AccordCommandStore store = (AccordCommandStore) stores.forId(storeId);
-                    AsyncChains.getUnchecked(store.submit(ctx, input -> {
+                    getBlocking(store.chain(ctx, input -> {
                         AccordSafeCommandStore safe = (AccordSafeCommandStore) input;
                         for (RoutingKey key : safe.commandsForKeysKeys())
                         {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
index 7f9fef5..a5a38ed 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
@@ -35,7 +35,7 @@
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
-import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncResult;
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -72,6 +72,7 @@
 import static accord.local.LoadKeysFor.READ_WRITE;
 import static java.lang.String.format;
 import static org.apache.cassandra.distributed.test.accord.AccordTestBase.executeWithRetry;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 
 public class AccordIncrementalRepairTest extends TestBaseImpl
 {
@@ -92,16 +93,16 @@
             return delegate.sync(requestedBy, onOrAfter, ranges, include, syncLocal, syncRemote, 10L, TimeUnit.MINUTES).map(v -> {
                 executedBarriers = true;
                 return v;
-            }).beginAsResult();
+            });
         }
 
         @Override
-        public AsyncResult<Void> sync(@Nullable Timestamp onOrAfter, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote)
+        public AsyncChain<Void> sync(@Nullable Timestamp onOrAfter, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote)
         {
             return delegate.sync(onOrAfter, keys, syncLocal, syncRemote).map(v -> {
                 executedBarriers = true;
                 return v;
-            }).beginAsResult();
+            });
         }
 
         public void reset()
@@ -206,7 +207,7 @@
     {
         Node node = accordService().node();
         AtomicReference<TxnId> waitFor = new AtomicReference<>(null);
-        AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+        getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
             AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore;
             SafeCommandsForKey safeCfk = store.ifLoadedAndInitialised(key);
             if (safeCfk == null)
@@ -228,7 +229,7 @@
             long now = Clock.Global.currentTimeMillis();
             if (now - start > TimeUnit.MINUTES.toMillis(1))
                 throw new AssertionError("Timeout");
-            AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+            AccordService.getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
                 SafeCommand command = safeStore.get(txnId, StoreParticipants.empty(txnId));
                 Assert.assertNotNull(command.current());
                 if (command.current().status().hasBeen(Status.Applied))
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
index ba7eff1..c9ec047 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
@@ -30,7 +30,6 @@
 import accord.local.durability.DurabilityService;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
-import accord.utils.async.AsyncChains;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
@@ -53,6 +52,7 @@
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
 import static org.apache.cassandra.service.accord.AccordKeyspace.JOURNAL;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 
 /**
  * This is a specific history of {@link StatefulJournalRestartTest}, which offers a more general way of testing this
@@ -90,7 +90,7 @@
             Ranges ranges = Ranges.single(TokenRange.fullRange(metadata.id, metadata.partitioner));
             for (int i = 0; i < 10; i++)
             {
-                AsyncChains.getBlockingAndRethrow(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
+                getBlocking(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
 
                 accord.journal().closeCurrentSegmentForTestingIfNonEmpty();
                 accord.journal().runCompactorForTesting();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java
index 76eeda8..acdd4da 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java
@@ -32,7 +32,6 @@
 import accord.primitives.Timestamp;
 import accord.utils.Property;
 import accord.utils.Property.SimpleCommand;
-import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
@@ -47,6 +46,7 @@
 import static accord.utils.Property.stateful;
 import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
 import static org.apache.cassandra.service.accord.AccordKeyspace.JOURNAL;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 
 /**
  * There are 2 errors blocking this test from being run
@@ -119,7 +119,7 @@
             Ranges ranges = Ranges.single(TokenRange.fullRange(metadata.id, metadata.partitioner));
             for (int i = 0; i < 10; i++)
             {
-                AsyncChains.getBlockingAndRethrow(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
+                getBlocking(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
 
                 accord.journal().closeCurrentSegmentForTestingIfNonEmpty();
                 accord.journal().runCompactorForTesting();
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index bc83665..39b693d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -94,12 +94,12 @@
 import static accord.primitives.Routable.Domain.Range;
 import static accord.primitives.Timestamp.Flag.HLC_BOUND;
 import static accord.primitives.Timestamp.Flag.SHARD_BOUND;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
 import static org.apache.cassandra.Util.spinAssertEquals;
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
 import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
 import static org.apache.cassandra.service.accord.AccordKeyspace.COMMANDS_FOR_KEY;
 import static org.apache.cassandra.service.accord.AccordKeyspace.CFKAccessor;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -314,32 +314,32 @@
             PartialDeps partialDeps = Deps.NONE.intersecting(AccordTestUtils.fullRange(txn));
             PartialTxn partialTxn = txn.slice(commandStore.unsafeGetRangesForEpoch().currentRanges(), true);
             Route<?> partialRoute = route.slice(commandStore.unsafeGetRangesForEpoch().currentRanges());
-            getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+            getBlocking(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
                 CheckedCommands.preaccept(safe, txnId, partialTxn, route, (a, b) -> {});
-            }).beginAsResult());
+            }));
             flush(commandStore);
-            getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+            getBlocking(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
                 CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, txnId, partialDeps, (a, b) -> {});
-            }).beginAsResult());
+            }));
             flush(commandStore);
-            getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+            getBlocking(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
                 CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, txnId, partialDeps, (a, b) -> {});
-            }).beginAsResult());
+            }));
             flush(commandStore);
-            getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+            getBlocking(commandStore.chain(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
                 return AccordTestUtils.processTxnResultDirect(safe, txnId, partialTxn, txnId);
-            }).flatMap(i -> i).flatMap(result -> commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+            }).flatMap(i -> i).flatMap(result -> commandStore.chain(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
                 CheckedCommands.apply(safe, txnId, route, txnId, partialDeps, partialTxn, result.left, result.right, (a, b) -> {});
             })));
             flush(commandStore);
             // The apply chain is asychronous, so it is easiest to just spin until it is applied
             // in order to have the updated state in the system table
             spinAssertEquals(true, 5, () -> {
-                return getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+                return getBlocking(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
                     StoreParticipants participants = StoreParticipants.all(route);
                     Command command = safe.get(txnId, participants).current();
                     return command.hasBeen(Status.Applied);
-                }).beginAsResult());
+                }));
             });
             flush(commandStore);
         }
diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 4e63ce3..7f7e44f 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db.virtual;
 
+import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -41,9 +42,10 @@
 import accord.primitives.Status.Durability.HasOutcome;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
-import accord.utils.async.AsyncChains;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.OptionaldPositiveInt;
+import org.apache.cassandra.config.YamlConfigurationLoader;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
@@ -54,6 +56,7 @@
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.TokenRange;
 import org.apache.cassandra.service.accord.api.TokenKey;
@@ -66,6 +69,10 @@
 import static accord.api.ProtocolModifiers.Toggles.SendStableMessages.TO_ALL;
 import static accord.primitives.TxnId.FastPath.Unoptimised;
 import static org.apache.cassandra.Util.spinUntilSuccess;
+import static org.apache.cassandra.net.Verb.ACCORD_APPLY_AND_WAIT_REQ;
+import static org.apache.cassandra.net.Verb.ACCORD_APPLY_REQ;
+import static org.apache.cassandra.net.Verb.ACCORD_BEGIN_RECOVER_REQ;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
 
 public class AccordDebugKeyspaceTest extends CQLTester
@@ -120,15 +127,21 @@
     @BeforeClass
     public static void setUpClass()
     {
+        Config.setOverrideLoadConfig(() -> {
+            Config config = new YamlConfigurationLoader().loadConfig();
+            config.accord.queue_shard_count = new OptionaldPositiveInt(1);
+            config.concurrent_accord_operations = 1;
+            config.accord.command_store_shard_count = new OptionaldPositiveInt(1);
+            config.accord.enable_virtual_debug_only_keyspace = true;
+            return config;
+        });
         daemonInitialization();
-        DatabaseDescriptor.getAccord().queue_shard_count = new OptionaldPositiveInt(1);
-        DatabaseDescriptor.getAccord().command_store_shard_count = new OptionaldPositiveInt(1);
         ProtocolModifiers.Toggles.setSendStableMessages(TO_ALL);
 
         CQLTester.setUpClass();
+        CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces();
 
         AccordService.startup(ClusterMetadata.current().myNodeId());
-        VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance);
         requireNetwork();
     }
 
@@ -169,7 +182,8 @@
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
             execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1));
-            accord.node().coordinate(id, txn);
+            accord.node().coordinate(id, txn).beginAsResult();
+            filter.appliesTo(id);
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
 
             filter.apply.awaitThrowUncheckedOnInterrupt();
@@ -198,7 +212,7 @@
         TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range, accord.nodeId());
         Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(1)), new TokenKey(tableId, new LongToken(100))));
         Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(100)), new TokenKey(tableId, new LongToken(200))));
-        AsyncChains.getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> "Test", safeStore -> {
+        getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> "Test", safeStore -> {
             safeStore.commandStore().markShardDurable(safeStore, syncId1, ranges1, HasOutcome.Universal);
             safeStore.commandStore().markShardDurable(safeStore, syncId2, ranges2, HasOutcome.Quorum);
         }));
@@ -218,11 +232,11 @@
         TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
         Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
         String keyStr = txn.keys().get(0).toUnseekable().toString();
-        AsyncChains.getBlocking(accord.node().coordinate(id, txn));
+        getBlocking(accord.node().coordinate(id, txn));
 
         spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
                                           row(id.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), SaveStatus.Applied.name()))));
-        assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), "Applied"));
+        spinUntilSuccess(() -> assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), "Applied")));
         assertRows(execute(QUERY_JOURNAL, id.toString()), row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"), row(id.toString(), null));
         assertRows(execute(QUERY_COMMANDS_FOR_KEY, keyStr), row(id.toString(), "APPLIED_DURABLE"));
     }
@@ -245,7 +259,8 @@
                                              "    END IF\n" +
                                              "COMMIT TRANSACTION", KEYSPACE, tableName, KEYSPACE, tableName);
             Txn txn = createTxn(insertTxn, 0, 0, 0, 0, 0);
-            accord.node().coordinate(id, txn);
+            filter.appliesTo(id);
+            accord.node().coordinate(id, txn).beginAsResult();
 
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
@@ -280,20 +295,22 @@
                                              "        INSERT INTO %s.%s (k, c, v) VALUES (?, ?, ?);\n" +
                                              "    END IF\n" +
                                              "COMMIT TRANSACTION", KEYSPACE, tableName, KEYSPACE, tableName);
-            accord.node().coordinate(first, createTxn(insertTxn, 0, 0, 0, 0, 0));
+
+            filter.appliesTo(first);
+            accord.node().coordinate(first, createTxn(insertTxn, 0, 0, 0, 0, 0)).beginAsResult();
 
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
-            assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
-                       row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+            spinUntilSuccess(() ->assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
+                       row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))));
 
             filter.apply.awaitThrowUncheckedOnInterrupt();
-            assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
-                       row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.ReadyToExecute.name()));
-
-            filter.reset();
+            spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
+                       row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.ReadyToExecute.name())));
 
             TxnId second = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
-            accord.node().coordinate(second, createTxn(insertTxn, 0, 0, 0, 0, 0));
+            filter.reset();
+            filter.appliesTo(second);
+            accord.node().coordinate(second, createTxn(insertTxn, 0, 0, 0, 0, 0)).beginAsResult();
 
             filter.commit.awaitThrowUncheckedOnInterrupt();
 
@@ -357,7 +374,8 @@
                                              tableName);
             AccordService accord = accord();
             TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
-            accord.node().coordinate(id, createTxn(insertTxn, 0, 0, 0));
+            filter.appliesTo(id);
+            accord.node().coordinate(id, createTxn(insertTxn, 0, 0, 0)).beginAsResult();
 
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
             String QUERY_JOURNAL = String.format("SELECT txn_id, save_status, command_store_id FROM %s.%s WHERE txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.JOURNAL);
@@ -402,6 +420,15 @@
         }
 
         ConcurrentMap<TxnId, ConcurrentSkipListSet<Verb>> txnToVerbs = new ConcurrentHashMap<>();
+        Set<Verb> dropVerbs = Set.of(ACCORD_APPLY_REQ, ACCORD_APPLY_AND_WAIT_REQ, ACCORD_BEGIN_RECOVER_REQ);
+        Set<TxnId> applyTo;
+
+        void appliesTo(TxnId txnId)
+        {
+            if (applyTo == null)
+                applyTo = Collections.newSetFromMap(new ConcurrentHashMap<>());
+            applyTo.add(txnId);
+        }
 
         @Override
         public boolean test(Message<?> msg, InetAddressAndPort to)
@@ -412,6 +439,8 @@
             if (msg.payload instanceof TxnRequest)
             {
                 txnId = ((TxnRequest<?>) msg.payload).txnId;
+                if (applyTo != null && !applyTo.contains(txnId))
+                    return true;
             }
             Set<Verb> seen;
             if (txnId != null)
@@ -424,32 +453,15 @@
                 case ACCORD_APPLY_REQ:
                 case ACCORD_APPLY_AND_WAIT_REQ:
                     apply.signalAll();
-                case ACCORD_BEGIN_RECOVER_REQ:
-                    return false;
+                    break;
                 case ACCORD_PRE_ACCEPT_RSP:
                     preAccept.signalAll();
-                    return true;
+                    break;
                 case ACCORD_COMMIT_REQ:
                 case ACCORD_STABLE_THEN_READ_REQ:
                     commit.signalAll();
-                    return true;
-                case ACCORD_PRE_ACCEPT_REQ:
-                case ACCORD_ACCEPT_REQ:
-                case ACCORD_ACCEPT_RSP:
-                case ACCORD_CHECK_STATUS_REQ:
-                case ACCORD_CHECK_STATUS_RSP:
-                case ACCORD_READ_REQ:
-                case ACCORD_READ_RSP:
-                case ACCORD_AWAIT_REQ:
-                case ACCORD_AWAIT_RSP:
-                case ACCORD_AWAIT_ASYNC_RSP_REQ:
-                    return true;
-                default:
-                    // many code paths don't log the error...
-                    UnsupportedOperationException e = new UnsupportedOperationException(msg.verb().name());
-                    logger.error("Unexpected verb {}", msg.verb(), e);
-                    throw e;
             }
+            return !dropVerbs.contains(msg.verb());
         }
     }
 }
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordAsyncConversionTest.java b/test/unit/org/apache/cassandra/service/accord/AccordAsyncConversionTest.java
new file mode 100644
index 0000000..09cc6e8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/AccordAsyncConversionTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+public class AccordAsyncConversionTest
+{
+    private static final String SUCCESS = "test-success";
+    private static final RuntimeException EXCEPTION = new RuntimeException("test-failure");
+
+    @Test
+    public void testAsyncChainToFutureSuccess()
+    {
+        AsyncChain<String> chain = AsyncChains.success(SUCCESS);
+        Future<String> future = AccordService.toFuture(chain);
+        
+        Assert.assertTrue("should be completed", future.isDone());
+        Assert.assertTrue("should be successful", future.isSuccess());
+        Assert.assertEquals("should contain expected value", SUCCESS, future.getNow());
+    }
+
+    @Test
+    public void testAsyncChainToFutureFailure()
+    {
+        AsyncChain<String> chain = AsyncChains.failure(EXCEPTION);
+        Future<String> future = AccordService.toFuture(chain);
+        
+        Assert.assertTrue("should be completed", future.isDone());
+        Assert.assertFalse("should be failed", future.isSuccess());
+        Assert.assertEquals("should contain expected exception", EXCEPTION, future.cause());
+    }
+
+    @Test
+    public void testAsyncChainToFutureAsyncCompletion() throws InterruptedException
+    {
+        CountDownLatch startLatch = new CountDownLatch(1);
+        AsyncChain<String> chain = new AsyncChains.Head<String>()
+        {
+            @Override
+            protected accord.utils.async.Cancellable start(BiConsumer<? super String, Throwable> callback)
+            {
+                Thread t = new Thread(() -> {
+                    try
+                    {
+                        startLatch.await();
+                        callback.accept(SUCCESS, null);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        callback.accept(null, e);
+                    }
+                });
+                t.start();
+                return () -> t.interrupt();
+            }
+        };
+        
+        Future<String> future = AccordService.toFuture(chain);
+        
+        Assert.assertFalse("should not be completed initially", future.isDone());
+        
+        startLatch.countDown();
+        
+        Assert.assertEquals("should complete with expected value", SUCCESS, future.syncUninterruptibly().getNow());
+    }
+
+    @Test
+    public void testAsyncResultToFutureWhenAlreadyFuture()
+    {
+        AsyncFuture<String> original = ImmediateFuture.success(SUCCESS);
+        Future<String> converted = AccordService.toFuture(original);
+        
+        Assert.assertSame("Should return the same instance when already a Future", original, converted);
+    }
+
+    @Test
+    public void testAsyncResultToFutureSuccess()
+    {
+        AsyncResult<String> result = new TestAsyncResult<>(SUCCESS, null);
+        Future<String> future = AccordService.toFuture(result);
+        
+        Assert.assertTrue("should be completed", future.isDone());
+        Assert.assertTrue("should be successful", future.isSuccess());
+        Assert.assertEquals("should contain expected value", SUCCESS, future.getNow());
+    }
+
+    @Test
+    public void testAsyncResultToFutureFailure()
+    {
+        AsyncResult<String> result = new TestAsyncResult<>(null, EXCEPTION);
+        Future<String> future = AccordService.toFuture(result);
+        
+        Assert.assertTrue("should be completed", future.isDone());
+        Assert.assertFalse("should be failed", future.isSuccess());
+        Assert.assertEquals("should contain expected exception", EXCEPTION, future.cause());
+    }
+
+    @Test
+    public void testGetBlockingAsyncChainSuccess()
+    {
+        AsyncChain<String> chain = AsyncChains.success(SUCCESS);
+        String result = AccordService.getBlocking(chain);
+        
+        Assert.assertEquals("Should return expected value", SUCCESS, result);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testGetBlockingAsyncChainFailure()
+    {
+        AsyncChain<String> chain = AsyncChains.failure(EXCEPTION);
+        AccordService.getBlocking(chain);
+    }
+
+    @Test
+    public void testGetBlockingAsyncResultSuccess()
+    {
+        AsyncResult<String> result = new TestAsyncResult<>(SUCCESS, null);
+        String value = AccordService.getBlocking(result);
+        
+        Assert.assertEquals("Should return expected value", SUCCESS, value);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testGetBlockingAsyncResultFailure()
+    {
+        AsyncResult<String> result = new TestAsyncResult<>(null, EXCEPTION);
+        AccordService.getBlocking(result);
+    }
+
+    @Test
+    public void testAsyncChainCancellation() throws InterruptedException
+    {
+        AtomicBoolean cancelled = new AtomicBoolean(false);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        
+        AsyncChain<String> chain = new AsyncChains.Head<String>()
+        {
+            @Override
+            protected accord.utils.async.Cancellable start(BiConsumer<? super String, Throwable> callback)
+            {
+                Thread t = new Thread(() -> {
+                    try
+                    {
+                        startLatch.await();
+                        Thread.sleep(1000);
+                        callback.accept(SUCCESS, null);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        cancelled.set(true);
+                        callback.accept(null, e);
+                    }
+                });
+                t.start();
+                return () -> {
+                    cancelled.set(true);
+                    t.interrupt();
+                };
+            }
+        };
+        
+        Future<String> future = AccordService.toFuture(chain);
+        startLatch.countDown();
+        
+        boolean cancelResult = future.cancel(true);
+        Thread.sleep(100);
+        
+        Assert.assertTrue("should be cancellable", cancelResult);
+        Assert.assertTrue("should be cancelled", future.isCancelled());
+    }
+
+    @Test
+    public void testMultipleCallbacksOnAsyncResult() throws InterruptedException
+    {
+        AtomicReference<String> callback1Result = new AtomicReference<>();
+        AtomicReference<String> callback2Result = new AtomicReference<>();
+        CountDownLatch callbackLatch = new CountDownLatch(2);
+        
+        AsyncResult<String> result = new TestAsyncResult<>(SUCCESS, null);
+        
+        result.invoke((value, throwable) -> {
+            callback1Result.set(value);
+            callbackLatch.countDown();
+        });
+        
+        result.invoke((value, throwable) -> {
+            callback2Result.set(value);
+            callbackLatch.countDown();
+        });
+        
+        Assert.assertTrue("Both callbacks should be invoked", callbackLatch.await(1, TimeUnit.SECONDS));
+        Assert.assertEquals("First callback should receive correct value", SUCCESS, callback1Result.get());
+        Assert.assertEquals("Second callback should receive correct value", SUCCESS, callback2Result.get());
+    }
+
+    @Test
+    public void testNullAsyncResult()
+    {
+        try
+        {
+            AccordService.toFuture((AsyncResult<String>) null);
+            Assert.fail("Should throw NullPointerException for null AsyncResult");
+        }
+        catch (NullPointerException e)
+        {
+        }
+    }
+
+    @Test
+    public void testNullAsyncChain()
+    {
+        try
+        {
+            AccordService.toFuture((AsyncChain<String>) null);
+            Assert.fail("Should throw NullPointerException for null AsyncChain");
+        }
+        catch (NullPointerException e)
+        {
+            // ignore
+        }
+    }
+
+    @Test
+    public void testFutureInterfaceCompliance()
+    {
+        AsyncChain<String> chain = AsyncChains.success(SUCCESS);
+        Future<String> future = AccordService.toFuture(chain);
+        
+        Assert.assertTrue("should implement AsyncResult", future instanceof AsyncResult);
+        Assert.assertTrue("should be completion-aware", future.isDone());
+        Assert.assertNotNull("should have a result", future.getNow());
+    }
+
+    @Test
+    public void testConcurrentConversion() throws InterruptedException
+    {
+        int threadCount = 10;
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch completeLatch = new CountDownLatch(threadCount);
+        AtomicReference<Exception> exception = new AtomicReference<>();
+        
+        for (int i = 0; i < threadCount; i++)
+        {
+            final int threadIndex = i;
+            Thread t = new Thread(() -> {
+                try
+                {
+                    startLatch.await();
+                    AsyncChain<Integer> chain = AsyncChains.success(threadIndex);
+                    Future<Integer> future = AccordService.toFuture(chain);
+                    Assert.assertEquals("Thread " + threadIndex + " should get correct result",
+                                      Integer.valueOf(threadIndex), future.getNow());
+                }
+                catch (Exception e)
+                {
+                    exception.set(e);
+                }
+                finally
+                {
+                    completeLatch.countDown();
+                }
+            });
+            t.start();
+        }
+        
+        startLatch.countDown();
+        Assert.assertTrue("All threads should complete", completeLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertNull("No exceptions should occur during concurrent conversion", exception.get());
+    }
+
+    private static class TestAsyncResult<V> implements AsyncResult<V>
+    {
+        private final V successValue;
+        private final Throwable failureValue;
+
+        public TestAsyncResult(V successValue, Throwable failureValue)
+        {
+            this.successValue = successValue;
+            this.failureValue = failureValue;
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isSuccess()
+        {
+            return false;
+        }
+
+        @Override
+        public AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback)
+        {
+            callback.accept(successValue, failureValue);
+            return null;
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 3fe68ae..49684ac 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -50,7 +50,6 @@
 import accord.primitives.Writes;
 import accord.utils.ImmutableBitSet;
 import accord.utils.LargeBitSet;
-import accord.utils.async.AsyncChains;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -73,6 +72,7 @@
 import static accord.primitives.Status.Durability.AllQuorums;
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 import static org.apache.cassandra.service.accord.AccordTestUtils.Commands.preaccepted;
 import static org.apache.cassandra.service.accord.AccordTestUtils.ballot;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createAccordCommandStore;
@@ -133,7 +133,7 @@
         LargeBitSet waitingOnApply = new LargeBitSet(3);
         waitingOnApply.set(1);
         Command.WaitingOn waitingOn = new Command.WaitingOn(dependencies.keyDeps.keys(), dependencies.rangeDeps, new ImmutableBitSet(waitingOnApply), new ImmutableBitSet(2));
-        Pair<Writes, Result> result = AsyncChains.getBlocking(AccordTestUtils.processTxnResult(commandStore, txnId, txn, executeAt));
+        Pair<Writes, Result> result = getBlocking(AccordTestUtils.processTxnResult(commandStore, txnId, txn, executeAt));
 
         Command expected = Command.Executed.executed(txnId, SaveStatus.Applied, AllQuorums, StoreParticipants.all(route),
                                                      promised, executeAt, txn, dependencies, accepted,
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 7a612c1..b714b83 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -59,8 +59,8 @@
 import static accord.api.ProtocolModifiers.Toggles.filterDuplicateDependenciesFromAcceptReply;
 import static accord.local.LoadKeysFor.READ_WRITE;
 import static accord.messages.Accept.Kind.SLOW;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createAccordCommandStore;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createWriteTxn;
 import static org.apache.cassandra.service.accord.AccordTestUtils.fullRange;
@@ -97,7 +97,7 @@
     public void basicCycleTest() throws Throwable
     {
         AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
-        getUninterruptibly(commandStore.execute((PreLoadContext.Empty)() -> "Test", unused -> commandStore.executor().cacheUnsafe().setCapacity(0)));
+        getBlocking(commandStore.execute((PreLoadContext.Empty)() -> "Test", unused -> commandStore.executor().cacheUnsafe().setCapacity(0)));
 
         TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
         Txn txn = createWriteTxn(1);
@@ -109,7 +109,7 @@
         PreAccept preAccept = PreAccept.SerializerSupport.create(txnId, route, 1, 1, 1, partialTxn, null, false, fullRoute);
 
         // Check preaccept
-        getUninterruptibly(commandStore.execute(preAccept, safeStore -> {
+        getBlocking(commandStore.execute(preAccept, safeStore -> {
             SafeCommand safeCommand = safeStore.get(txnId, StoreParticipants.all(route));
             Command before = safeCommand.current();
             PreAccept.PreAcceptReply reply = preAccept.apply(safeStore);
@@ -123,7 +123,7 @@
             AccordTestUtils.appendCommandsBlocking(commandStore, before, after);
         }));
 
-        getUninterruptibly(commandStore.execute(preAccept, safeStore -> {
+        getBlocking(commandStore.execute(preAccept, safeStore -> {
             Command before = safeStore.ifInitialised(txnId).current();
             SafeCommand safeCommand = safeStore.get(txnId, StoreParticipants.all(route));
             Assert.assertEquals(txnId, before.executeAt());
@@ -147,7 +147,7 @@
         }
         Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, SLOW, Ballot.ZERO, executeAt, deps, false);
 
-        getUninterruptibly(commandStore.execute(accept, safeStore -> {
+        getBlocking(commandStore.execute(accept, safeStore -> {
             Command before = safeStore.ifInitialised(txnId).current();
             Accept.AcceptReply reply = accept.apply(safeStore);
             Assert.assertTrue(reply.isOk());
@@ -156,7 +156,7 @@
             AccordTestUtils.appendCommandsBlocking(commandStore, before, after);
         }));
 
-        getUninterruptibly(commandStore.execute(accept, safeStore -> {
+        getBlocking(commandStore.execute(accept, safeStore -> {
             Command before = safeStore.ifInitialised(txnId).current();
             Assert.assertEquals(executeAt, before.executeAt());
             Assert.assertEquals(Status.AcceptedSlow, before.status());
@@ -170,9 +170,9 @@
 
         // check commit
         Commit commit = Commit.SerializerSupport.create(txnId, route, 1, 1, Commit.Kind.StableWithTxnAndDeps, Ballot.ZERO, executeAt, partialTxn, deps, fullRoute);
-        getUninterruptibly(commandStore.execute(commit, commit::apply));
+        getBlocking(commandStore.execute(commit, commit::apply));
 
-        getUninterruptibly(commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key).toParticipants(), LoadKeys.SYNC, READ_WRITE, "Test"), safeStore -> {
+        getBlocking(commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key).toParticipants(), LoadKeys.SYNC, READ_WRITE, "Test"), safeStore -> {
             Command before = safeStore.ifInitialised(txnId).current();
             Assert.assertEquals(commit.executeAt, before.executeAt());
             Assert.assertTrue(before.hasBeen(Status.Committed));
@@ -189,7 +189,7 @@
     public void computeDeps() throws Throwable
     {
         AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
-        getUninterruptibly(commandStore.execute((PreLoadContext.Empty)()->"Test", unused -> commandStore.executor().cacheUnsafe().setCapacity(0)));
+        getBlocking(commandStore.execute((PreLoadContext.Empty)()->"Test", unused -> commandStore.executor().cacheUnsafe().setCapacity(0)));
 
         TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1);
         Txn txn = createWriteTxn(2);
@@ -200,7 +200,7 @@
         PartialTxn partialTxn = txn.intersecting(route, true);
         PreAccept preAccept1 = PreAccept.SerializerSupport.create(txnId1, route, 1, 1, 1, partialTxn, null, false, fullRoute);
 
-        getUninterruptibly(commandStore.execute(preAccept1, safeStore -> {
+        getBlocking(commandStore.execute(preAccept1, safeStore -> {
             persistDiff(commandStore, safeStore, txnId1, route, () -> {
                 preAccept1.apply(safeStore);
             });
@@ -209,7 +209,7 @@
         // second preaccept should identify txnId1 as a dependency
         TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
         PreAccept preAccept2 = PreAccept.SerializerSupport.create(txnId2, route, 1, 1, 1, partialTxn, null, false, fullRoute);
-        getUninterruptibly(commandStore.execute(preAccept2, safeStore -> {
+        getBlocking(commandStore.execute(preAccept2, safeStore -> {
             persistDiff(commandStore, safeStore, txnId2, route, () -> {
                 PreAccept.PreAcceptReply reply = preAccept2.apply(safeStore);
                 Assert.assertTrue(reply.isOk());
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
index 0dbc166..520e96f 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
@@ -23,7 +23,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -90,8 +89,8 @@
 import static accord.local.LoadKeysFor.READ_WRITE;
 import static accord.local.PreLoadContext.contextFor;
 import static accord.utils.Property.qt;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createAccordCommandStore;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createPartialTxn;
 import static org.apache.cassandra.service.accord.AccordTestUtils.keys;
@@ -128,7 +127,7 @@
         AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
         TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
 
-        getUninterruptibly(commandStore.execute(PreLoadContext.contextFor(txnId, "Test"), instance -> {
+        getBlocking(commandStore.execute(PreLoadContext.contextFor(txnId, "Test"), instance -> {
             // TODO review: This change to `ifInitialized` was done in a lot of places and it doesn't preserve this property
             // I fixed this reference to point to `ifLoadedAndInitialised` and but didn't update other places
             Assert.assertNull(instance.ifInitialised(txnId));
@@ -142,7 +141,7 @@
         AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
         TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
 
-        getUninterruptibly(commandStore.execute(PreLoadContext.contextFor(txnId, "Test"), safe -> {
+        getBlocking(commandStore.execute(PreLoadContext.contextFor(txnId, "Test"), safe -> {
             StoreParticipants participants = StoreParticipants.empty(txnId);
             SafeCommand command = safe.get(txnId, participants);
             Assert.assertNotNull(command);
@@ -156,7 +155,7 @@
         Txn txn = AccordTestUtils.createWriteTxn((int)clock.incrementAndGet());
         TokenKey key = ((PartitionKey) Iterables.getOnlyElement(txn.keys())).toUnseekable();
 
-        getUninterruptibly(commandStore.execute((PreLoadContext.Empty)() -> "Test", instance -> {
+        getBlocking(commandStore.execute((PreLoadContext.Empty)() -> "Test", instance -> {
             SafeCommandsForKey cfk = instance.ifLoadedAndInitialised(key);
             Assert.assertNull(cfk);
         }));
@@ -199,31 +198,24 @@
         route.slice(ranges);
         PartialDeps deps = PartialDeps.builder(ranges, true).build();
 
-        try
+        Command command = getBlocking(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+            CheckedCommands.preaccept(safe, txnId, partialTxn, route, appendDiffToLog(commandStore));
+            CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
+            return safe.ifInitialised(txnId).current();
+        }));
+
+        // clear cache
+        try (ExclusiveGlobalCaches cache = commandStore.executor().lockCaches();)
         {
-            Command command = getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
-                CheckedCommands.preaccept(safe, txnId, partialTxn, route, appendDiffToLog(commandStore));
-                CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
-                return safe.ifInitialised(txnId).current();
-            }).beginAsResult());
-
-            // clear cache
-            try (ExclusiveGlobalCaches cache = commandStore.executor().lockCaches();)
-            {
-                long cacheSize = cache.global.capacity();
-                cache.global.setCapacity(0);
-                cache.global.setCapacity(cacheSize);
-            }
-
-            while (commandStore.executor().hasTasks())
-                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
-
-            return command;
+            long cacheSize = cache.global.capacity();
+            cache.global.setCapacity(0);
+            cache.global.setCapacity(cacheSize);
         }
-        catch (ExecutionException e)
-        {
-            throw new AssertionError(e);
-        }
+
+        while (commandStore.executor().hasTasks())
+            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
+
+        return command;
     }
 
     private static Command createStableUsingSlowLifeCycle(AccordCommandStore commandStore, TxnId txnId)
@@ -249,33 +241,26 @@
         Route<?> partialRoute = route.slice(ranges);
         PartialDeps deps = PartialDeps.builder(ranges, true).build();
 
-        try
+        Command command = getBlocking(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+            CheckedCommands.preaccept(safe, txnId, partialTxn, route, appendDiffToLog(commandStore));
+            CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, executeAt, deps, appendDiffToLog(commandStore));
+            CheckedCommands.commit(safe, SaveStatus.Committed, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
+            CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
+            return safe.ifInitialised(txnId).current();
+        }));
+
+        // clear cache
+        try (ExclusiveGlobalCaches cache = commandStore.executor().lockCaches();)
         {
-            Command command = getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
-                CheckedCommands.preaccept(safe, txnId, partialTxn, route, appendDiffToLog(commandStore));
-                CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, executeAt, deps, appendDiffToLog(commandStore));
-                CheckedCommands.commit(safe, SaveStatus.Committed, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
-                CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
-                return safe.ifInitialised(txnId).current();
-            }).beginAsResult());
-
-            // clear cache
-            try (ExclusiveGlobalCaches cache = commandStore.executor().lockCaches();)
-            {
-                long cacheSize = cache.global.capacity();
-                cache.global.setCapacity(0);
-                cache.global.setCapacity(cacheSize);
-            }
-
-            while (commandStore.executor().hasTasks())
-                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
-
-            return command;
+            long cacheSize = cache.global.capacity();
+            cache.global.setCapacity(0);
+            cache.global.setCapacity(cacheSize);
         }
-        catch (ExecutionException e)
-        {
-            throw new AssertionError(e);
-        }
+
+        while (commandStore.executor().hasTasks())
+            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
+
+        return command;
     }
 
     @Test
@@ -318,7 +303,7 @@
                 });
             }
             AccordTask<Void> o1 = AccordTask.create(commandStore, ctx, consumer);
-            AssertionUtils.assertThatThrownBy(() -> getUninterruptibly(o1.chain()))
+            AssertionUtils.assertThatThrownBy(() -> getBlocking(o1.chain()))
                           .hasRootCause()
                           .isInstanceOf(NullPointerException.class)
                           .hasNoSuppressedExceptions();
@@ -343,7 +328,7 @@
                     store.ifInitialised(id).readyToExecute(store);
                 });
             });
-            getUninterruptibly(o2.chain());
+            getBlocking(o2.chain());
             awaitDone(commandStore, ids, participants);
             assertNoReferences(commandStore, ids, participants);
 
@@ -376,7 +361,7 @@
 
             AccordTask<Void> operation = AccordTask.create(commandStore, ctx, consumer);
 
-            AssertionUtils.assertThatThrownBy(() -> getUninterruptibly(operation.chain()))
+            AssertionUtils.assertThatThrownBy(() -> getBlocking(operation.chain()))
                           .hasRootCause()
                           .isInstanceOf(NullPointerException.class)
                           .hasMessage(errorMsg)
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index e96cf8a..72f84c9 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -116,9 +116,9 @@
 import static accord.primitives.SaveStatus.PreAccepted;
 import static accord.primitives.Status.Durability.NotDurable;
 import static accord.primitives.Txn.Kind.Write;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
 import static java.lang.String.format;
 import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 
 public class AccordTestUtils
 {
@@ -238,8 +238,8 @@
     public static AsyncChain<Pair<Writes, Result>> processTxnResult(AccordCommandStore commandStore, TxnId txnId, PartialTxn txn, Timestamp executeAt) throws Throwable
     {
         AtomicReference<AsyncChain<Pair<Writes, Result>>> result = new AtomicReference<>();
-        getUninterruptibly(commandStore.execute((PreLoadContext.Empty)() -> "Test",
-                           safeStore -> result.set(processTxnResultDirect(safeStore, txnId, txn, executeAt))));
+        getBlocking(commandStore.execute((PreLoadContext.Empty)() -> "Test",
+                                         safeStore -> result.set(processTxnResultDirect(safeStore, txnId, txn, executeAt))));
         return result.get();
     }
 
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
index fec84f5..636f724 100644
--- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -690,9 +690,9 @@
                     public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean isLoad, boolean startSync)
                     {
                         AsyncResult<Void> metadata = schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult();
-                        AsyncResult<Void> coordination = metadata.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult();
-                        AsyncResult<Void> data = coordination.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult();
-                        AsyncResult<Void> reads = data.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult();
+                        AsyncResult<Void> coordination = metadata.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult());
+                        AsyncResult<Void> data = coordination.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult());
+                        AsyncResult<Void> reads = data.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult());
                         EpochReady ready = new EpochReady(topology.epoch(), metadata, coordination, data, reads);
 
                         topology().onTopologyUpdate(topology, () -> ready, e -> {});
diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index e0d40cd..b9602de 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -77,7 +77,6 @@
 import accord.topology.TopologyManager;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
-import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
@@ -100,6 +99,7 @@
 import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
 import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
 import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 import static org.apache.cassandra.utils.AccordGenerators.fromQT;
 
 public class SimulatedAccordCommandStore implements AutoCloseable
@@ -430,7 +430,7 @@
     {
         var result = processAsync(loadCtx, function);
         processAll();
-        return AsyncChains.getBlocking(result);
+        return getBlocking(result);
     }
 
     public <T extends Reply> AsyncResult<T> processAsync(TxnRequest<T> request)
@@ -440,7 +440,7 @@
 
     public <T extends Reply> AsyncResult<T> processAsync(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
     {
-        return commandStore.submit(loadCtx, function).beginAsResult();
+        return commandStore.submit(loadCtx, function);
     }
 
     public Pair<TxnId, AsyncResult<PreAccept.PreAcceptOk>> enqueuePreAccept(Txn txn, FullRoute<?> route)
@@ -464,7 +464,7 @@
             var reply = br.apply(safe);
             Assertions.assertThat(reply.kind() == BeginRecovery.RecoverReply.Kind.Ok).isTrue();
             return (BeginRecovery.RecoverOk) reply;
-        }).beginAsResult());
+        }));
     }
 
     public void processAll()
diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index f7778c6..0fd249a 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -32,6 +32,7 @@
 import org.junit.Before;
 import org.junit.BeforeClass;
 
+import accord.api.AsyncExecutor;
 import accord.api.RoutingKey;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.local.Node;
@@ -56,7 +57,6 @@
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.CassandraRelevantProperties;
@@ -77,6 +77,7 @@
 import org.assertj.core.api.Assertions;
 
 import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
 
 public abstract class SimulatedAccordCommandStoreTestBase extends CQLTester
@@ -202,7 +203,7 @@
     {
         var pair = assertDepsMessageAsync(instance, messageType, txn, route, keyConflicts, rangeConflicts);
         instance.processAll();
-        AsyncChains.getBlocking(pair.right);
+        getBlocking(pair.right);
 
         return pair.left;
     }
@@ -241,7 +242,7 @@
         return Pair.create(pair.left, pair.right.map(success -> {
             assertDeps(success.txnId, success.deps, cloneKeyConflicts, cloneRangeConflicts);
             return null;
-        }).beginAsResult());
+        }));
     }
 
     protected static Pair<TxnId, AsyncResult<?>> assertBeginRecoveryAsync(SimulatedAccordCommandStore instance,
@@ -260,7 +261,7 @@
             Deps proposeDeps = LatestDeps.mergeProposal(Collections.singletonList(success), ok -> ok.deps);
             assertDeps(success.txnId, proposeDeps, cloneKeyConflicts, cloneRangeConflicts);
             return null;
-        }).beginAsResult());
+        }));
     }
 
     protected static Pair<TxnId, AsyncResult<?>> assertBeginRecoveryAfterPreAcceptAsync(SimulatedAccordCommandStore instance,
@@ -285,10 +286,10 @@
             assertDeps(success.txnId, success.deps, cloneKeyConflicts, cloneRangeConflicts);
             return success;
         });
-        var delay = preAcceptAsync.flatMap(ignore -> AsyncChains.ofCallable(instance.unorderedScheduled, () -> {
+        var delay = preAcceptAsync.flatMap(ignore -> AsyncExecutor.chain(instance.unorderedScheduled, () -> {
             Ballot ballot = Ballot.fromValues(instance.storeService.epoch(), instance.storeService.now(), nodeId);
             return new BeginRecovery(nodeId, new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, instance.topology), txnId, null, false, txn, route, ballot);
-        }));
+        }).beginAsResult());
         var recoverAsync = delay.flatMap(br -> instance.processAsync(br, safe -> {
             var reply = br.apply(safe);
             Assertions.assertThat(reply.kind() == BeginRecovery.RecoverReply.Kind.Ok).isTrue();
@@ -298,7 +299,7 @@
             return success;
         }));
 
-        return Pair.create(txnId, recoverAsync.beginAsResult());
+        return Pair.create(txnId, recoverAsync);
     }
 
     protected static void assertDeps(TxnId txnId, Deps deps,
diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
index d5454e7..d54058b 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
@@ -129,7 +129,7 @@
                             }
                         };
                         instance.maybeCacheEvict(txn.keys().toParticipants());
-                        instance.processAsync(preAccept).begin(counter);
+                        instance.processAsync(preAccept).invoke(counter);
                     }
                     break;
                     default:
diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 42fec34..884638d 100644
--- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -28,7 +28,6 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
@@ -649,14 +648,15 @@
         }
 
         @Override public boolean inStore() { return true; }
+        @Override public AsyncChain<Void> chain(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) { throw new UnsupportedOperationException();}
+        @Override public <T> AsyncChain<T> chain(PreLoadContext context, Function<? super SafeCommandStore, T> apply) { throw new UnsupportedOperationException(); }
+
         @Override public Journal.Replayer replayer() { throw new UnsupportedOperationException(); }
 
         @Override protected void ensureDurable(Ranges ranges, RedundantBefore onSuccess) {}
         @Override public Agent agent() { return this; }
-        @Override public AsyncChain<Void> build(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) { return null; }
-        @Override public <T> AsyncChain<T> build(PreLoadContext context, Function<? super SafeCommandStore, T> apply) { throw new UnsupportedOperationException(); }
+        @Override public void execute(Runnable run) {}
         @Override public void shutdown() { }
-        @Override public <T> AsyncChain<T> build(Callable<T> task) { throw new UnsupportedOperationException(); }
         @Override public void onFailedBootstrap(int attempts, String phase, Ranges ranges, Runnable retry, Throwable failure) { throw new UnsupportedOperationException(); }
         @Override public void onStale(Timestamp staleSince, Ranges ranges) { throw new UnsupportedOperationException(); }
         @Override public void onUncaughtException(Throwable t) { throw new UnsupportedOperationException(); }
diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtils.java b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
index 2ec07f7..6a0a1f5 100644
--- a/test/unit/org/apache/cassandra/utils/AssertionUtils.java
+++ b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
@@ -26,8 +26,6 @@
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Condition;
 import org.assertj.core.api.ThrowableAssert;
-import org.assertj.core.error.BasicErrorMessageFactory;
-import org.assertj.core.internal.Failures;
 
 public class AssertionUtils
 {
@@ -191,8 +189,6 @@
         public ThrowableAssertPlus hasRootCause()
         {
             Throwable cause = Throwables.getRootCause(actual);
-            if (cause == actual)
-                throw Failures.instance().failure(this.info, new BasicErrorMessageFactory("%nExpected a root cause but cause was null", new Object[0]));
             return new ThrowableAssertPlus(cause);
         }
     }