Allow exceptions to be propagated remotely

https://github.com/apache/cassandra-accord/pull/56

Patch by Ariel Weisberg; Reviewed by David Capwell for CASSANDRA-18779
diff --git a/accord-core/src/main/java/accord/api/MessageSink.java b/accord-core/src/main/java/accord/api/MessageSink.java
index ee4d681..47a3fa0 100644
--- a/accord-core/src/main/java/accord/api/MessageSink.java
+++ b/accord-core/src/main/java/accord/api/MessageSink.java
@@ -30,4 +30,5 @@
     void send(Id to, Request request);
     void send(Id to, Request request, AgentExecutor executor, Callback callback);
     void reply(Id replyingToNode, ReplyContext replyContext, Reply reply);
+    void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure);
 }
diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index daaf81f..1b5b39b 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -77,11 +77,6 @@
                     tryFailure(new RuntimeException("Unexpected reply"));
                     return;
 
-                case Error:
-                    // TODO (required): error propagation
-                    tryFailure(new RuntimeException("Unknown error"));
-                    return;
-
                 case Invalid:
                     tryFailure(new Invalidated(exclusiveSyncPoint.syncId, exclusiveSyncPoint.homeKey));
                     return;
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index da39a05..7a64395 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -24,14 +24,20 @@
 import accord.api.Data;
 import accord.api.Result;
 import accord.local.Node;
-import accord.messages.ReadTxnData;
+import accord.local.Node.Id;
+import accord.messages.Commit;
 import accord.messages.ReadData.ReadNack;
 import accord.messages.ReadData.ReadOk;
 import accord.messages.ReadData.ReadReply;
-import accord.primitives.*;
+import accord.messages.ReadTxnData;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 import accord.topology.Topologies;
-import accord.local.Node.Id;
-import accord.messages.Commit;
 import accord.topology.Topology;
 
 import static accord.coordinate.ReadCoordinator.Action.Approve;
@@ -111,9 +117,6 @@
         switch (nack)
         {
             default: throw new IllegalStateException();
-            case Error:
-                // TODO (expected): report content of error
-                return Action.Reject;
             case Redundant:
                 callback.accept(null, new Preempted(txnId, route.homeKey()));
                 return Action.Aborted;
diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index eedf19d..aaeeb86 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -154,8 +154,6 @@
                             case Invalid:
                             case Redundant:
                                 throw new AssertionError(String.format("Unexpected reply: %s", reply));
-                            case Error:
-                                // TODO (required): ensure errors are propagated to coordinators and can be logged
                         }
                     }
                     return;
@@ -273,9 +271,11 @@
         }
 
         @Override
-        protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+        protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail)
         {
-            node.reply(replyTo, replyContext, new FetchResponse(unavailable, data, maxApplied));
+            // TODO (review): If the fetch response actually does some streaming, but we send back the error
+            // it is a lot of work and data that might move and be unaccounted for at the coordinator
+            node.reply(replyTo, replyContext, fail == null ? new FetchResponse(unavailable, data, maxApplied) : null, fail);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 7cf84df..1916f0b 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -34,7 +34,6 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import accord.primitives.EpochSupplier;
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +59,7 @@
 import accord.messages.Request;
 import accord.messages.TxnRequest;
 import accord.primitives.Ballot;
+import accord.primitives.EpochSupplier;
 import accord.primitives.FullRoute;
 import accord.primitives.ProgressToken;
 import accord.primitives.Range;
@@ -475,10 +475,17 @@
         messageSink.send(to, send);
     }
 
-    public void reply(Id replyingToNode, ReplyContext replyContext, Reply send)
+    public void reply(Id replyingToNode, ReplyContext replyContext, Reply send, Throwable failure)
     {
-        // TODO (usability, now): add Throwable as an argument so the error check is here, every single message gets this wrong causing a NPE here
-        if (send == null)
+        if (failure != null)
+        {
+            agent.onUncaughtException(failure);
+            if (send != null)
+                agent().onUncaughtException(new IllegalArgumentException(String.format("fail (%s) and send (%s) are both not null", failure, send)));
+            messageSink.replyWithUnknownFailure(replyingToNode, replyContext, failure);
+            return;
+        }
+        else if (send == null)
         {
             NullPointerException e = new NullPointerException();
             agent.onUncaughtException(e);
@@ -633,7 +640,16 @@
                 return;
             }
         }
-        scheduler.now(() -> request.process(this, from, replyContext));
+        scheduler.now(() -> {
+            try
+            {
+                request.process(this, from, replyContext);
+            }
+            catch (Throwable t)
+            {
+                reply(from, replyContext, null, t);
+            }
+        });
     }
 
     public Scheduler scheduler()
diff --git a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
index fe1790f..482fcf3 100644
--- a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
+++ b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
@@ -56,7 +56,7 @@
     @Override
     public void accept(R reply, Throwable failure)
     {
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java
index a55bcf4..1554e5d 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -18,21 +18,30 @@
 
 package accord.messages;
 
-import accord.local.Commands;
-import accord.local.Commands.AcceptOutcome;
-import accord.local.SafeCommand;
-import accord.local.SafeCommandStore;
-import accord.primitives.*;
-import accord.local.Node.Id;
-import accord.topology.Topologies;
-
-import accord.api.RoutingKey;
-
 import javax.annotation.Nonnull;
-
 import javax.annotation.Nullable;
 
-import static accord.local.Commands.AcceptOutcome.*;
+import accord.api.RoutingKey;
+import accord.local.Commands;
+import accord.local.Commands.AcceptOutcome;
+import accord.local.Node.Id;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import static accord.local.Commands.AcceptOutcome.Redundant;
+import static accord.local.Commands.AcceptOutcome.RejectedBallot;
+import static accord.local.Commands.AcceptOutcome.Success;
+import static accord.local.Commands.AcceptOutcome.Truncated;
 
 // TODO (low priority, efficiency): use different objects for send and receive, so can be more efficient
 //                                  (e.g. serialize without slicing, and without unnecessary fields)
@@ -118,7 +127,7 @@
     @Override
     public void accept(AcceptReply reply, Throwable failure)
     {
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java
index da06f6b..12d5614 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -153,9 +153,7 @@
     @Override
     public void accept(ApplyReply reply, Throwable failure)
     {
-        if (failure != null)
-            node.agent().onUncaughtException(failure);
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index f1a16b8..1efd652 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -18,25 +18,43 @@
 
 package accord.messages;
 
-import accord.api.Result;
-import accord.local.*;
-import accord.primitives.*;
-import accord.topology.Topologies;
-
 import java.util.List;
-
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import accord.utils.Invariants;
-
+import accord.api.Result;
+import accord.local.Command;
+import accord.local.Commands;
 import accord.local.Node.Id;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.Status;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.topology.Topologies;
+import accord.utils.Invariants;
 
 import static accord.local.SafeCommandStore.TestDep.WITH;
 import static accord.local.SafeCommandStore.TestDep.WITHOUT;
 import static accord.local.SafeCommandStore.TestKind.shouldHaveWitnessed;
-import static accord.local.SafeCommandStore.TestTimestamp.*;
-import static accord.local.Status.*;
+import static accord.local.SafeCommandStore.TestTimestamp.EXECUTES_AFTER;
+import static accord.local.SafeCommandStore.TestTimestamp.STARTED_AFTER;
+import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
+import static accord.local.Status.Accepted;
+import static accord.local.Status.Committed;
+import static accord.local.Status.Phase;
+import static accord.local.Status.PreAccepted;
+import static accord.local.Status.PreCommitted;
 import static accord.messages.PreAccept.calculatePartialDeps;
 
 public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply>
@@ -179,7 +197,7 @@
     @Override
     public void accept(RecoverReply reply, Throwable failure)
     {
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java b/accord-core/src/main/java/accord/messages/CheckStatus.java
index a18b2c5..4c8ae20 100644
--- a/accord-core/src/main/java/accord/messages/CheckStatus.java
+++ b/accord-core/src/main/java/accord/messages/CheckStatus.java
@@ -18,6 +18,8 @@
 
 package accord.messages;
 
+import javax.annotation.Nullable;
+
 import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.coordinate.Infer;
@@ -46,7 +48,6 @@
 import accord.topology.Topologies;
 import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
-import javax.annotation.Nullable;
 
 import static accord.local.Status.Committed;
 import static accord.local.Status.Definition;
@@ -175,8 +176,9 @@
     @Override
     public void accept(CheckStatusReply ok, Throwable failure)
     {
-        if (ok == null) node.reply(replyTo, replyContext, CheckStatusNack.NotOwned);
-        else node.reply(replyTo, replyContext, ok);
+        if (failure != null) node.reply(replyTo, replyContext, ok, failure);
+        else if (ok == null) node.reply(replyTo, replyContext, CheckStatusNack.NotOwned, null);
+        else node.reply(replyTo, replyContext, ok, null);
     }
 
     private Status invalidIfNotAtLeast(SafeCommandStore safeStore)
diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java
index 3954a8e..b755827 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -19,21 +19,37 @@
 package accord.messages;
 
 import java.util.Set;
-
-import accord.local.*;
-import accord.messages.ReadData.ReadNack;
-import accord.messages.ReadData.ReadReply;
-import accord.primitives.*;
-import accord.local.Node.Id;
-import accord.topology.Topologies;
 import javax.annotation.Nullable;
 
-import accord.utils.Invariants;
-
-import accord.topology.Topology;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.local.Commands;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.messages.ReadData.ReadNack;
+import accord.messages.ReadData.ReadReply;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+
 import static accord.local.Status.Committed;
 import static accord.local.Status.Known.DefinitionOnly;
 
@@ -177,14 +193,8 @@
     @Override
     public synchronized void accept(ReadNack reply, Throwable failure)
     {
-        if (failure != null)
-        {
-            logger.error("Unhandled exception during commit", failure);
-            node.agent().onUncaughtException(failure);
-            return;
-        }
-        if (reply != null)
-            node.reply(replyTo, replyContext, reply);
+        if (reply != null || failure != null)
+            node.reply(replyTo, replyContext, reply, failure);
         else if (read != null)
             read.process(node, replyTo, replyContext);
         if (defer != null)
diff --git a/accord-core/src/main/java/accord/messages/GetDeps.java b/accord-core/src/main/java/accord/messages/GetDeps.java
index f757aa7..083c15c 100644
--- a/accord-core/src/main/java/accord/messages/GetDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetDeps.java
@@ -18,14 +18,19 @@
 
 package accord.messages;
 
-import accord.local.SafeCommandStore;
-import accord.primitives.*;
-import accord.utils.Invariants;
+import javax.annotation.Nonnull;
 
 import accord.local.Node.Id;
+import accord.local.SafeCommandStore;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.topology.Topologies;
-
-import javax.annotation.Nonnull;
+import accord.utils.Invariants;
 
 import static accord.messages.PreAccept.calculatePartialDeps;
 
@@ -78,8 +83,7 @@
     @Override
     public void accept(PartialDeps result, Throwable failure)
     {
-        if (result == null) node.agent().onUncaughtException(failure); // TODO (expected): propagate failures to coordinator
-        else node.reply(replyTo, replyContext, new GetDepsOk(result));
+        node.reply(replyTo, replyContext, result != null ? new GetDepsOk(result) : null, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java
index 79a1612..d9f4221 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -19,10 +19,17 @@
 package accord.messages;
 
 import accord.api.ProgressLog.ProgressShard;
-import accord.local.*;
+import accord.local.Commands;
 import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.Status;
 import accord.local.Status.Durability;
-import accord.primitives.*;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialRoute;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.utils.Invariants;
 
@@ -104,14 +111,7 @@
     @Override
     public void accept(Reply reply, Throwable failure)
     {
-        // TODO: respond with failure
-        if (reply == null)
-        {
-            if (failure == null)
-                throw new IllegalStateException("Processed nothing on this node");
-            throw new IllegalStateException(failure);
-        }
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java
index 8b68436..7909157 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -54,6 +54,7 @@
     SET_GLOBALLY_DURABLE_REQ (true ),
     QUERY_DURABLE_BEFORE_REQ (false),
     QUERY_DURABLE_BEFORE_RSP (false),
+    FAILURE_RSP              (false),
     ;
 
     /**
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
index d035e8e..1be51ca 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -20,18 +20,28 @@
 
 import java.util.List;
 import java.util.Objects;
-
-import accord.local.*;
-import accord.local.SafeCommandStore.TestKind;
-
-import accord.local.Node.Id;
-import accord.messages.TxnRequest.WithUnsynced;
-import accord.topology.Shard;
-import accord.topology.Topologies;
-
 import javax.annotation.Nullable;
 
-import accord.primitives.*;
+import accord.local.Command;
+import accord.local.Commands;
+import accord.local.Node.Id;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SafeCommandStore.TestKind;
+import accord.messages.TxnRequest.WithUnsynced;
+import accord.primitives.Deps;
+import accord.primitives.EpochSupplier;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Shard;
+import accord.topology.Topologies;
 
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
@@ -147,8 +157,7 @@
     @Override
     public void accept(PreAcceptReply reply, Throwable failure)
     {
-        // TODO (required, error handling): communicate back the failure
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 618e147..66b5234 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -21,21 +21,20 @@
 import java.util.BitSet;
 import javax.annotation.Nullable;
 
-import accord.api.Data;
-import accord.primitives.Participants;
-import accord.topology.Topologies;
-import accord.utils.Invariants;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Data;
 import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.SafeCommandStore;
 import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import accord.utils.Invariants;
 
 import static accord.messages.MessageType.READ_RSP;
 import static accord.messages.TxnRequest.computeWaitForEpoch;
@@ -71,7 +70,7 @@
 
     protected abstract void cancel();
     protected abstract long executeAtEpoch();
-    protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data);
+    protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail);
 
     @Override
     public long waitForEpoch()
@@ -99,12 +98,12 @@
     {
         if (reply != null)
         {
-            node.reply(replyTo, replyContext, reply);
+            node.reply(replyTo, replyContext, reply, failure);
         }
         else if (failure != null)
         {
             // TODO (expected, testing): test
-            node.reply(replyTo, replyContext, ReadNack.Error);
+            node.reply(replyTo, replyContext, null, failure);
             data = null;
             // TODO (expected, exceptions): probably a better way to handle this, as might not be uncaught
             node.agent().onUncaughtException(failure);
@@ -129,7 +128,7 @@
         // and prevents races where we respond before dispatching all the required reads (if the reads are
         // completing faster than the reads can be setup on all required shards)
         if (-1 == --waitingOnCount)
-            reply(this.unavailable, data);
+            reply(this.unavailable, data, null);
     }
 
     protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable)
@@ -153,7 +152,8 @@
             {
                 // TODO (expected, exceptions): should send exception to client, and consistency handle/propagate locally
                 logger.trace("{}: read failed for {}: {}", txnId, unsafeStore, throwable);
-                node.reply(replyTo, replyContext, ReadNack.Error);
+                node.reply(replyTo, replyContext, null, throwable);
+                cancel();
             }
             else
                 readComplete(unsafeStore, next, unavailable);
@@ -173,7 +173,7 @@
 
     public enum ReadNack implements ReadReply
     {
-        Invalid, NotCommitted, Redundant, Error;
+        Invalid, NotCommitted, Redundant;
 
         @Override
         public String toString()
diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java b/accord-core/src/main/java/accord/messages/ReadTxnData.java
index b34e9ae..4e31910 100644
--- a/accord-core/src/main/java/accord/messages/ReadTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadTxnData.java
@@ -199,7 +199,7 @@
         if (state == State.PENDING)
         {
             state = State.OBSOLETE;
-            node.reply(replyTo, replyContext, Redundant);
+            node.reply(replyTo, replyContext, Redundant, null);
         }
     }
 
@@ -231,18 +231,20 @@
     }
 
     @Override
-    protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+    protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail)
     {
         switch (state)
         {
             case RETURNED:
-                throw new IllegalStateException("ReadOk was sent, yet ack called again");
+                throw new IllegalStateException("ReadOk was sent, yet ack called again", fail);
             case OBSOLETE:
                 logger.debug("After the read completed for txn {}, the result was marked obsolete", txnId);
+                if (fail != null)
+                    node.agent().onUncaughtException(fail);
                 break;
             case PENDING:
                 state = State.RETURNED;
-                node.reply(replyTo, replyContext, new ReadOk(unavailable, data));
+                node.reply(replyTo, replyContext, fail == null ? new ReadOk(unavailable, data) : null, fail);
                 break;
             default:
                 throw new AssertionError("Unknown state: " + state);
diff --git a/accord-core/src/main/java/accord/messages/Reply.java b/accord-core/src/main/java/accord/messages/Reply.java
index e82c4f9..e1fc7e0 100644
--- a/accord-core/src/main/java/accord/messages/Reply.java
+++ b/accord-core/src/main/java/accord/messages/Reply.java
@@ -18,7 +18,28 @@
 
 package accord.messages;
 
+import javax.annotation.Nonnull;
+
+import static accord.messages.MessageType.FAILURE_RSP;
+
 public interface Reply extends Message
 {
     default boolean isFinal() { return true; }
+
+    class FailureReply implements Reply
+    {
+        @Nonnull
+        public final Throwable failure;
+
+        public FailureReply(@Nonnull Throwable failure)
+        {
+            this.failure = failure;
+        }
+
+        @Override
+        public MessageType type()
+        {
+            return FAILURE_RSP;
+        }
+    }
 }
diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
index 5ec41e0..ca52b69 100644
--- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java
+++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
@@ -20,16 +20,21 @@
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import accord.local.*;
-import accord.local.Node.Id;
-import accord.primitives.*;
-import accord.utils.MapReduceConsume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
-
+import accord.local.Command;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.Participants;
+import accord.primitives.TxnId;
 import accord.topology.Topology;
+import accord.utils.MapReduceConsume;
+
+import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
 
 public class WaitOnCommit implements Request, MapReduceConsume<SafeCommandStore, Void>, PreLoadContext, Command.TransientListener
 {
@@ -141,13 +146,30 @@
     @Override
     public void accept(Void result, Throwable failure)
     {
-        ack();
+        if (failure != null)
+        {
+            while (true)
+            {
+                int initialValue = waitingOnUpdater.get(this);
+                if (initialValue == -1)
+                {
+                    node.agent().onUncaughtException(new IllegalStateException("Had error in WaitOnCommit, but already replied so can't send failure response", failure));
+                    break;
+                }
+                if (waitingOnUpdater.compareAndSet(this, initialValue, -1))
+                    node.reply(replyTo, replyContext, null, failure);
+            }
+        }
+        else
+        {
+            ack();
+        }
     }
 
     private void ack()
     {
         if (waitingOnUpdater.decrementAndGet(this) == -1)
-            node.reply(replyTo, replyContext, WaitOnCommitOk.INSTANCE);
+            node.reply(replyTo, replyContext, WaitOnCommitOk.INSTANCE, null);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index 06905dd..682c33b 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -193,7 +193,7 @@
             return;
 
         isInvalid = true;
-        node.reply(replyTo, replyContext, Invalid);
+        node.reply(replyTo, replyContext, Invalid, null);
     }
 
     synchronized void sendTruncated()
@@ -202,7 +202,7 @@
             return;
 
         isInvalid = true;
-        node.reply(replyTo, replyContext, Redundant);
+        node.reply(replyTo, replyContext, Redundant, null);
     }
 
     void applied(SafeCommandStore safeStore, SafeCommand safeCommand)
@@ -223,12 +223,13 @@
     }
 
     @Override
-    protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+    protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail)
     {
         if (isInvalid)
             return;
 
-        node.reply(replyTo, replyContext, new ReadOk(unavailable, data));
+        // data can be null so send the failure response if a failure is present
+        node.reply(replyTo, replyContext, fail == null ? new ReadOk(unavailable, data) : null, fail);
     }
 
     private void removeListener(SafeCommandStore safeStore, TxnId txnId)
diff --git a/accord-core/src/main/java/accord/utils/Utils.java b/accord-core/src/main/java/accord/utils/Utils.java
index 3ce0cf1..696edb9 100644
--- a/accord-core/src/main/java/accord/utils/Utils.java
+++ b/accord-core/src/main/java/accord/utils/Utils.java
@@ -18,13 +18,23 @@
 
 package accord.utils;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.IntFunction;
+
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
 
-import java.util.*;
-import java.util.function.IntFunction;
-
 // TODO (low priority): remove when jdk8 support is dropped
 public class Utils
 {
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index dfa7fe0..5773e68 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -18,9 +18,6 @@
 
 package accord.utils.async;
 
-import accord.api.VisibleForImplementation;
-import accord.utils.Invariants;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -28,6 +25,9 @@
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import accord.api.VisibleForImplementation;
+import accord.utils.Invariants;
+
 public class AsyncResults
 {
     public static final AsyncResult<Void> SUCCESS_VOID = success(null);
@@ -104,7 +104,7 @@
             }
         }
 
-        boolean trySetResult(V result, Throwable failure)
+        protected boolean trySetResult(V result, Throwable failure)
         {
             return trySetResult(new Result<>(result, failure));
         }
@@ -323,7 +323,7 @@
     @VisibleForImplementation
     public static class RunnableResult<V> extends AbstractResult<V> implements Runnable
     {
-        private final Callable<V> callable;
+        protected final Callable<V> callable;
 
         public RunnableResult(Callable<V> callable)
         {
@@ -333,14 +333,19 @@
         @Override
         public void run()
         {
+            // There are two different type of exceptions: user function throws, listener throws.  To make sure this is clear,
+            // make sure to catch the exception from the user function and set as failed, and let the listener exceptions bubble up.
+            V call;
             try
             {
-                trySetResult(callable.call(), null);
+                call = callable.call();
             }
             catch (Throwable t)
             {
                 trySetResult(null, t);
+                return;
             }
+            trySetResult(call, null);
         }
     }
 
diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 4c9ce6c..01ea96d 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -18,16 +18,6 @@
 
 package accord.burn;
 
-import accord.api.TestableConfigurationService;
-import accord.local.AgentExecutor;
-import accord.impl.AbstractConfigurationService;
-import accord.primitives.Ranges;
-import accord.utils.RandomSource;
-import accord.local.Node;
-import accord.messages.*;
-import accord.topology.Topology;
-import accord.utils.async.AsyncResults;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -35,6 +25,20 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import accord.api.TestableConfigurationService;
+import accord.impl.AbstractConfigurationService;
+import accord.local.AgentExecutor;
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.Reply;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.primitives.Ranges;
+import accord.topology.Topology;
+import accord.utils.RandomSource;
+import accord.utils.async.AsyncResults;
+
 public class BurnTestConfigurationService extends AbstractConfigurationService.Minimal implements TestableConfigurationService
 {
     private final AgentExecutor executor;
@@ -66,7 +70,7 @@
         public void process(Node on, Node.Id from, ReplyContext replyContext)
         {
             Topology topology = on.configService().getTopologyForEpoch(epoch);
-            on.reply(from, replyContext, new FetchTopologyReply(topology));
+            on.reply(from, replyContext, new FetchTopologyReply(topology), null);
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index 20f1705..e14fb52 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -23,16 +23,17 @@
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import accord.api.MessageSink;
 import accord.local.AgentExecutor;
-import accord.messages.SafeCallback;
-import accord.utils.RandomSource;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.api.MessageSink;
 import accord.messages.Callback;
 import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
+import accord.messages.SafeCallback;
+import accord.utils.RandomSource;
 
 import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
 
@@ -82,4 +83,10 @@
     {
         parent.add(self, replyToNode, Packet.getMessageId(replyContext), reply);
     }
+
+    @Override
+    public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
+    {
+        reply(replyingToNode, replyContext, new FailureReply(failure));
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
index 63fbae8..2f3bd1b 100644
--- a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -98,7 +98,7 @@
                 // run without setting the result
                 try
                 {
-                    fn.call();
+                    callable.call();
                     long nowMillis = pending.nowInMillis();
                     if (periodMillis > 0)
                     {
@@ -117,7 +117,7 @@
                 }
                 catch (Throwable t)
                 {
-                    setFailure(t);
+                    trySetResult(null, t);
                 }
             }
         }
diff --git a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
index 059bc63..0e73309 100644
--- a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
@@ -32,26 +32,11 @@
 
 public abstract class TaskExecutorService extends AbstractExecutorService implements AgentExecutor
 {
-    public static class Task<T> extends AsyncResults.SettableResult<T> implements Pending, RunnableFuture<T>
+    public static class Task<T> extends AsyncResults.RunnableResult<T> implements Pending, RunnableFuture<T>
     {
-        protected final Callable<T> fn;
-
         public Task(Callable<T> fn)
         {
-            this.fn = fn;
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                setSuccess(fn.call());
-            }
-            catch (Throwable t)
-            {
-                setFailure(t);
-            }
+            super(fn);
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index a0b55d3..5fb3924 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -20,14 +20,20 @@
 
 import java.util.function.Consumer;
 
+import accord.api.Agent;
+import accord.api.Result;
 import accord.impl.mock.Network;
 import accord.local.Command;
 import accord.local.Node;
-import accord.api.Agent;
-import accord.api.Result;
-import accord.primitives.*;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 
 import static accord.local.Node.Id.NONE;
+import static accord.utils.Invariants.checkState;
 import static com.google.common.base.Functions.identity;
 
 public class ListAgent implements Agent
@@ -46,11 +52,16 @@
     @Override
     public void onRecover(Node node, Result success, Throwable fail)
     {
+        if (fail != null)
+        {
+            checkState(success == null, "fail (%s) and success (%s) are both not null", fail, success);
+            // We don't really process errors for Recover here even though it is provided in the interface
+        }
         if (success != null)
         {
             ListResult result = (ListResult) success;
             if (result.requestId > Integer.MIN_VALUE)
-                node.reply(result.client, Network.replyCtxFor(result.requestId), result);
+                node.reply(result.client, Network.replyCtxFor(result.requestId), result, null);
         }
     }
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index b0874c6..c6127a3 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -35,9 +35,9 @@
 import accord.messages.CheckStatus.IncludeInfo;
 import accord.messages.MessageType;
 import accord.messages.ReplyContext;
+import accord.messages.Request;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Txn;
-import accord.messages.Request;
 import accord.primitives.TxnId;
 
 import static accord.local.Status.Phase.Cleanup;
@@ -107,41 +107,36 @@
         @Override
         public void accept(Result success, Throwable fail)
         {
-            // TODO (desired, testing): error handling
-            if (success != null)
-            {
-                node.reply(client, replyContext, (ListResult) success);
-            }
-            else if (fail instanceof CoordinationFailed)
+            if (fail instanceof CoordinationFailed)
             {
                 RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
                 TxnId txnId = ((CoordinationFailed) fail).txnId();
                 if (fail instanceof Invalidated)
                 {
-                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null));
+                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null);
                     return;
                 }
 
-                node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null));
+                node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null), null);
                 ((Cluster)node.scheduler()).onDone(() -> {
                     node.commandStores()
                         .select(homeKey)
                         .execute(() -> CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f) -> {
                             if (f != null)
                             {
-                                node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f instanceof Truncated ? new int[2][] : new int[3][], null));
+                                node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f instanceof Truncated ? new int[2][] : new int[3][], null), null);
                                 return;
                             }
                             switch (s)
                             {
                                 case Truncated:
-                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[2][], null));
+                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[2][], null), null);
                                     break;
                                 case Invalidated:
-                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null));
+                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null);
                                     break;
                                 case Lost:
-                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[1][], null));
+                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[1][], null), null);
                                     break;
                                 case Other:
                                     // currently caught elsewhere in response tracking, but might help to throw an exception here
@@ -149,6 +144,10 @@
                         }));
                 });
             }
+            else
+            {
+                node.reply(client, replyContext, (ListResult) success, fail);
+            }
         }
     }
 
diff --git a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
index e33314d..ba0502e 100644
--- a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
+++ b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
@@ -18,11 +18,13 @@
 
 package accord.impl.mock;
 
+import accord.api.MessageSink;
 import accord.local.AgentExecutor;
 import accord.local.Node;
-import accord.api.MessageSink;
+import accord.local.Node.Id;
 import accord.messages.Callback;
 import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 
@@ -54,4 +56,10 @@
     {
         network.reply(node, replyingToNode, Network.getMessageId(replyContext), reply);
     }
+
+    @Override
+    public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
+    {
+        network.reply(node, replyingToNode, Network.getMessageId(replyContext), new FailureReply(failure));
+    }
 }
diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java b/accord-core/src/test/java/accord/utils/MessageTask.java
index d1064e3..528a6e4 100644
--- a/accord-core/src/test/java/accord/utils/MessageTask.java
+++ b/accord-core/src/test/java/accord/utils/MessageTask.java
@@ -18,16 +18,25 @@
 
 package accord.utils;
 
-import accord.local.AgentExecutor;
-import accord.local.Node;
-import accord.messages.*;
-import accord.utils.async.AsyncResults;
-import com.google.common.collect.ImmutableList;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
+import com.google.common.collect.ImmutableList;
+
+import accord.local.AgentExecutor;
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.Reply;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.utils.async.AsyncResults;
+
 /**
  * Message task that will continue sending messages to a set of nodes until all
  * nodes ack the message.
@@ -88,7 +97,7 @@
         @Override
         public void process(Node on, Node.Id from, ReplyContext replyContext)
         {
-            process.process(on, from, success -> on.reply(from, replyContext, success ? SUCCESS : FAILURE));
+            process.process(on, from, success -> on.reply(from, replyContext, success ? SUCCESS : FAILURE, null));
         }
 
         @Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 51da1b3..cd7741b 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -51,6 +51,7 @@
 import accord.local.ShardDistributor;
 import accord.messages.Callback;
 import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
@@ -120,6 +121,12 @@
             long replyToMessage = ((Packet) replyContext).body.msg_id;
             parent.add(self, replyToNode, replyToMessage, reply);
         }
+
+        @Override
+        public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
+        {
+            reply(replyingToNode, replyContext, new FailureReply(failure));
+        }
     }
 
     final Function<Id, Node> lookup;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
index d069118..dad08b0 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
@@ -22,8 +22,8 @@
 
 import accord.maelstrom.Packet.Type;
 import accord.messages.MessageType;
-import com.google.gson.stream.JsonWriter;
 import accord.messages.Reply;
+import com.google.gson.stream.JsonWriter;
 
 public class Error extends Body implements Reply
 {
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index 34f34d1..30f2a0f 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -18,13 +18,20 @@
 
 package accord.maelstrom;
 
-import accord.local.Command;
-import accord.local.Node;
+import java.util.concurrent.TimeUnit;
+
 import accord.api.Agent;
 import accord.api.Result;
-import accord.primitives.*;
+import accord.local.Command;
+import accord.local.Node;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 
-import java.util.concurrent.TimeUnit;
+import static accord.utils.Invariants.checkState;
 
 public class MaelstromAgent implements Agent
 {
@@ -33,10 +40,15 @@
     @Override
     public void onRecover(Node node, Result success, Throwable fail)
     {
+        if (fail != null)
+        {
+            checkState(success == null, "fail (%s) and success (%s) are both not null", fail, success);
+            // We don't really process errors for Recover here even though it is provided in the interface
+        }
         if (success != null)
         {
             MaelstromResult result = (MaelstromResult) success;
-            node.reply(result.client, MaelstromReplyContext.contextFor(result.requestId), new MaelstromReply(result.requestId, result));
+            node.reply(result.client, MaelstromReplyContext.contextFor(result.requestId), new MaelstromReply(result.requestId, result), null);
         }
     }
 
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
index e81f539..f9aad09 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
@@ -23,17 +23,18 @@
 import java.util.TreeSet;
 
 import accord.api.Key;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.maelstrom.Packet.Type;
 import accord.messages.MessageType;
+import accord.messages.Reply;
 import accord.messages.ReplyContext;
+import accord.messages.Request;
 import accord.primitives.Keys;
+import accord.primitives.Txn;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonToken;
 import com.google.gson.stream.JsonWriter;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.primitives.Txn;
-import accord.maelstrom.Packet.Type;
-import accord.messages.Request;
 
 public class MaelstromRequest extends Body implements Request
 {
@@ -49,8 +50,8 @@
     public void process(Node node, Id client, ReplyContext replyContext)
     {
         node.coordinate(txn).addCallback((success, fail) -> {
-            if (success != null) node.reply(client, replyContext, new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success));
-//            else node.reply(client, messageId, new Error(messageId, 13, fail.getMessage()));
+            Reply reply = success != null ? new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success) : null;
+            node.reply(client, replyContext, reply, fail);
         });
     }
 
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 0d96058..1600e03 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -44,6 +44,7 @@
 import accord.maelstrom.Packet.Type;
 import accord.messages.Callback;
 import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.topology.Topology;
@@ -129,6 +130,12 @@
         {
             send(new Packet(self, replyToNode, MaelstromReplyContext.messageIdFor(replyContext), reply));
         }
+
+        @Override
+        public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
+        {
+            reply(replyingToNode, replyContext, new FailureReply(failure));
+        }
     }
 
     public static void listen(TopologyFactory topologyFactory, InputStream stdin, PrintStream out, PrintStream err) throws IOException