Allow exceptions to be propagated remotely
https://github.com/apache/cassandra-accord/pull/56
Patch by Ariel Weisberg; Reviewed by David Capwell for CASSANDRA-18779
diff --git a/accord-core/src/main/java/accord/api/MessageSink.java b/accord-core/src/main/java/accord/api/MessageSink.java
index ee4d681..47a3fa0 100644
--- a/accord-core/src/main/java/accord/api/MessageSink.java
+++ b/accord-core/src/main/java/accord/api/MessageSink.java
@@ -30,4 +30,5 @@
void send(Id to, Request request);
void send(Id to, Request request, AgentExecutor executor, Callback callback);
void reply(Id replyingToNode, ReplyContext replyContext, Reply reply);
+ void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure);
}
diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index daaf81f..1b5b39b 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -77,11 +77,6 @@
tryFailure(new RuntimeException("Unexpected reply"));
return;
- case Error:
- // TODO (required): error propagation
- tryFailure(new RuntimeException("Unknown error"));
- return;
-
case Invalid:
tryFailure(new Invalidated(exclusiveSyncPoint.syncId, exclusiveSyncPoint.homeKey));
return;
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index da39a05..7a64395 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -24,14 +24,20 @@
import accord.api.Data;
import accord.api.Result;
import accord.local.Node;
-import accord.messages.ReadTxnData;
+import accord.local.Node.Id;
+import accord.messages.Commit;
import accord.messages.ReadData.ReadNack;
import accord.messages.ReadData.ReadOk;
import accord.messages.ReadData.ReadReply;
-import accord.primitives.*;
+import accord.messages.ReadTxnData;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
import accord.topology.Topologies;
-import accord.local.Node.Id;
-import accord.messages.Commit;
import accord.topology.Topology;
import static accord.coordinate.ReadCoordinator.Action.Approve;
@@ -111,9 +117,6 @@
switch (nack)
{
default: throw new IllegalStateException();
- case Error:
- // TODO (expected): report content of error
- return Action.Reject;
case Redundant:
callback.accept(null, new Preempted(txnId, route.homeKey()));
return Action.Aborted;
diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index eedf19d..aaeeb86 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -154,8 +154,6 @@
case Invalid:
case Redundant:
throw new AssertionError(String.format("Unexpected reply: %s", reply));
- case Error:
- // TODO (required): ensure errors are propagated to coordinators and can be logged
}
}
return;
@@ -273,9 +271,11 @@
}
@Override
- protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+ protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail)
{
- node.reply(replyTo, replyContext, new FetchResponse(unavailable, data, maxApplied));
+ // TODO (review): If the fetch response actually does some streaming, but we send back the error
+ // it is a lot of work and data that might move and be unaccounted for at the coordinator
+ node.reply(replyTo, replyContext, fail == null ? new FetchResponse(unavailable, data, maxApplied) : null, fail);
}
@Override
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 7cf84df..1916f0b 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -34,7 +34,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import accord.primitives.EpochSupplier;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +59,7 @@
import accord.messages.Request;
import accord.messages.TxnRequest;
import accord.primitives.Ballot;
+import accord.primitives.EpochSupplier;
import accord.primitives.FullRoute;
import accord.primitives.ProgressToken;
import accord.primitives.Range;
@@ -475,10 +475,17 @@
messageSink.send(to, send);
}
- public void reply(Id replyingToNode, ReplyContext replyContext, Reply send)
+ public void reply(Id replyingToNode, ReplyContext replyContext, Reply send, Throwable failure)
{
- // TODO (usability, now): add Throwable as an argument so the error check is here, every single message gets this wrong causing a NPE here
- if (send == null)
+ if (failure != null)
+ {
+ agent.onUncaughtException(failure);
+ if (send != null)
+ agent().onUncaughtException(new IllegalArgumentException(String.format("fail (%s) and send (%s) are both not null", failure, send)));
+ messageSink.replyWithUnknownFailure(replyingToNode, replyContext, failure);
+ return;
+ }
+ else if (send == null)
{
NullPointerException e = new NullPointerException();
agent.onUncaughtException(e);
@@ -633,7 +640,16 @@
return;
}
}
- scheduler.now(() -> request.process(this, from, replyContext));
+ scheduler.now(() -> {
+ try
+ {
+ request.process(this, from, replyContext);
+ }
+ catch (Throwable t)
+ {
+ reply(from, replyContext, null, t);
+ }
+ });
}
public Scheduler scheduler()
diff --git a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
index fe1790f..482fcf3 100644
--- a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
+++ b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
@@ -56,7 +56,7 @@
@Override
public void accept(R reply, Throwable failure)
{
- node.reply(replyTo, replyContext, reply);
+ node.reply(replyTo, replyContext, reply, failure);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java
index a55bcf4..1554e5d 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -18,21 +18,30 @@
package accord.messages;
-import accord.local.Commands;
-import accord.local.Commands.AcceptOutcome;
-import accord.local.SafeCommand;
-import accord.local.SafeCommandStore;
-import accord.primitives.*;
-import accord.local.Node.Id;
-import accord.topology.Topologies;
-
-import accord.api.RoutingKey;
-
import javax.annotation.Nonnull;
-
import javax.annotation.Nullable;
-import static accord.local.Commands.AcceptOutcome.*;
+import accord.api.RoutingKey;
+import accord.local.Commands;
+import accord.local.Commands.AcceptOutcome;
+import accord.local.Node.Id;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import static accord.local.Commands.AcceptOutcome.Redundant;
+import static accord.local.Commands.AcceptOutcome.RejectedBallot;
+import static accord.local.Commands.AcceptOutcome.Success;
+import static accord.local.Commands.AcceptOutcome.Truncated;
// TODO (low priority, efficiency): use different objects for send and receive, so can be more efficient
// (e.g. serialize without slicing, and without unnecessary fields)
@@ -118,7 +127,7 @@
@Override
public void accept(AcceptReply reply, Throwable failure)
{
- node.reply(replyTo, replyContext, reply);
+ node.reply(replyTo, replyContext, reply, failure);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java
index da06f6b..12d5614 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -153,9 +153,7 @@
@Override
public void accept(ApplyReply reply, Throwable failure)
{
- if (failure != null)
- node.agent().onUncaughtException(failure);
- node.reply(replyTo, replyContext, reply);
+ node.reply(replyTo, replyContext, reply, failure);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index f1a16b8..1efd652 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -18,25 +18,43 @@
package accord.messages;
-import accord.api.Result;
-import accord.local.*;
-import accord.primitives.*;
-import accord.topology.Topologies;
-
import java.util.List;
-
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import accord.utils.Invariants;
-
+import accord.api.Result;
+import accord.local.Command;
+import accord.local.Commands;
import accord.local.Node.Id;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.Status;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.topology.Topologies;
+import accord.utils.Invariants;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.local.SafeCommandStore.TestDep.WITHOUT;
import static accord.local.SafeCommandStore.TestKind.shouldHaveWitnessed;
-import static accord.local.SafeCommandStore.TestTimestamp.*;
-import static accord.local.Status.*;
+import static accord.local.SafeCommandStore.TestTimestamp.EXECUTES_AFTER;
+import static accord.local.SafeCommandStore.TestTimestamp.STARTED_AFTER;
+import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
+import static accord.local.Status.Accepted;
+import static accord.local.Status.Committed;
+import static accord.local.Status.Phase;
+import static accord.local.Status.PreAccepted;
+import static accord.local.Status.PreCommitted;
import static accord.messages.PreAccept.calculatePartialDeps;
public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply>
@@ -179,7 +197,7 @@
@Override
public void accept(RecoverReply reply, Throwable failure)
{
- node.reply(replyTo, replyContext, reply);
+ node.reply(replyTo, replyContext, reply, failure);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java b/accord-core/src/main/java/accord/messages/CheckStatus.java
index a18b2c5..4c8ae20 100644
--- a/accord-core/src/main/java/accord/messages/CheckStatus.java
+++ b/accord-core/src/main/java/accord/messages/CheckStatus.java
@@ -18,6 +18,8 @@
package accord.messages;
+import javax.annotation.Nullable;
+
import accord.api.Result;
import accord.api.RoutingKey;
import accord.coordinate.Infer;
@@ -46,7 +48,6 @@
import accord.topology.Topologies;
import accord.utils.Invariants;
import accord.utils.MapReduceConsume;
-import javax.annotation.Nullable;
import static accord.local.Status.Committed;
import static accord.local.Status.Definition;
@@ -175,8 +176,9 @@
@Override
public void accept(CheckStatusReply ok, Throwable failure)
{
- if (ok == null) node.reply(replyTo, replyContext, CheckStatusNack.NotOwned);
- else node.reply(replyTo, replyContext, ok);
+ if (failure != null) node.reply(replyTo, replyContext, ok, failure);
+ else if (ok == null) node.reply(replyTo, replyContext, CheckStatusNack.NotOwned, null);
+ else node.reply(replyTo, replyContext, ok, null);
}
private Status invalidIfNotAtLeast(SafeCommandStore safeStore)
diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java
index 3954a8e..b755827 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -19,21 +19,37 @@
package accord.messages;
import java.util.Set;
-
-import accord.local.*;
-import accord.messages.ReadData.ReadNack;
-import accord.messages.ReadData.ReadReply;
-import accord.primitives.*;
-import accord.local.Node.Id;
-import accord.topology.Topologies;
import javax.annotation.Nullable;
-import accord.utils.Invariants;
-
-import accord.topology.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.local.Commands;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.messages.ReadData.ReadNack;
+import accord.messages.ReadData.ReadReply;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+
import static accord.local.Status.Committed;
import static accord.local.Status.Known.DefinitionOnly;
@@ -177,14 +193,8 @@
@Override
public synchronized void accept(ReadNack reply, Throwable failure)
{
- if (failure != null)
- {
- logger.error("Unhandled exception during commit", failure);
- node.agent().onUncaughtException(failure);
- return;
- }
- if (reply != null)
- node.reply(replyTo, replyContext, reply);
+ if (reply != null || failure != null)
+ node.reply(replyTo, replyContext, reply, failure);
else if (read != null)
read.process(node, replyTo, replyContext);
if (defer != null)
diff --git a/accord-core/src/main/java/accord/messages/GetDeps.java b/accord-core/src/main/java/accord/messages/GetDeps.java
index f757aa7..083c15c 100644
--- a/accord-core/src/main/java/accord/messages/GetDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetDeps.java
@@ -18,14 +18,19 @@
package accord.messages;
-import accord.local.SafeCommandStore;
-import accord.primitives.*;
-import accord.utils.Invariants;
+import javax.annotation.Nonnull;
import accord.local.Node.Id;
+import accord.local.SafeCommandStore;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
import accord.topology.Topologies;
-
-import javax.annotation.Nonnull;
+import accord.utils.Invariants;
import static accord.messages.PreAccept.calculatePartialDeps;
@@ -78,8 +83,7 @@
@Override
public void accept(PartialDeps result, Throwable failure)
{
- if (result == null) node.agent().onUncaughtException(failure); // TODO (expected): propagate failures to coordinator
- else node.reply(replyTo, replyContext, new GetDepsOk(result));
+ node.reply(replyTo, replyContext, result != null ? new GetDepsOk(result) : null, failure);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java
index 79a1612..d9f4221 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -19,10 +19,17 @@
package accord.messages;
import accord.api.ProgressLog.ProgressShard;
-import accord.local.*;
+import accord.local.Commands;
import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.Status;
import accord.local.Status.Durability;
-import accord.primitives.*;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialRoute;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.Invariants;
@@ -104,14 +111,7 @@
@Override
public void accept(Reply reply, Throwable failure)
{
- // TODO: respond with failure
- if (reply == null)
- {
- if (failure == null)
- throw new IllegalStateException("Processed nothing on this node");
- throw new IllegalStateException(failure);
- }
- node.reply(replyTo, replyContext, reply);
+ node.reply(replyTo, replyContext, reply, failure);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java
index 8b68436..7909157 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -54,6 +54,7 @@
SET_GLOBALLY_DURABLE_REQ (true ),
QUERY_DURABLE_BEFORE_REQ (false),
QUERY_DURABLE_BEFORE_RSP (false),
+ FAILURE_RSP (false),
;
/**
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
index d035e8e..1be51ca 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -20,18 +20,28 @@
import java.util.List;
import java.util.Objects;
-
-import accord.local.*;
-import accord.local.SafeCommandStore.TestKind;
-
-import accord.local.Node.Id;
-import accord.messages.TxnRequest.WithUnsynced;
-import accord.topology.Shard;
-import accord.topology.Topologies;
-
import javax.annotation.Nullable;
-import accord.primitives.*;
+import accord.local.Command;
+import accord.local.Commands;
+import accord.local.Node.Id;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SafeCommandStore.TestKind;
+import accord.messages.TxnRequest.WithUnsynced;
+import accord.primitives.Deps;
+import accord.primitives.EpochSupplier;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Shard;
+import accord.topology.Topologies;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
@@ -147,8 +157,7 @@
@Override
public void accept(PreAcceptReply reply, Throwable failure)
{
- // TODO (required, error handling): communicate back the failure
- node.reply(replyTo, replyContext, reply);
+ node.reply(replyTo, replyContext, reply, failure);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 618e147..66b5234 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -21,21 +21,20 @@
import java.util.BitSet;
import javax.annotation.Nullable;
-import accord.api.Data;
-import accord.primitives.Participants;
-import accord.topology.Topologies;
-import accord.utils.Invariants;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.Data;
import accord.local.CommandStore;
import accord.local.Node;
import accord.local.SafeCommandStore;
import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import accord.utils.Invariants;
import static accord.messages.MessageType.READ_RSP;
import static accord.messages.TxnRequest.computeWaitForEpoch;
@@ -71,7 +70,7 @@
protected abstract void cancel();
protected abstract long executeAtEpoch();
- protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data);
+ protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail);
@Override
public long waitForEpoch()
@@ -99,12 +98,12 @@
{
if (reply != null)
{
- node.reply(replyTo, replyContext, reply);
+ node.reply(replyTo, replyContext, reply, failure);
}
else if (failure != null)
{
// TODO (expected, testing): test
- node.reply(replyTo, replyContext, ReadNack.Error);
+ node.reply(replyTo, replyContext, null, failure);
data = null;
// TODO (expected, exceptions): probably a better way to handle this, as might not be uncaught
node.agent().onUncaughtException(failure);
@@ -129,7 +128,7 @@
// and prevents races where we respond before dispatching all the required reads (if the reads are
// completing faster than the reads can be setup on all required shards)
if (-1 == --waitingOnCount)
- reply(this.unavailable, data);
+ reply(this.unavailable, data, null);
}
protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable)
@@ -153,7 +152,8 @@
{
// TODO (expected, exceptions): should send exception to client, and consistency handle/propagate locally
logger.trace("{}: read failed for {}: {}", txnId, unsafeStore, throwable);
- node.reply(replyTo, replyContext, ReadNack.Error);
+ node.reply(replyTo, replyContext, null, throwable);
+ cancel();
}
else
readComplete(unsafeStore, next, unavailable);
@@ -173,7 +173,7 @@
public enum ReadNack implements ReadReply
{
- Invalid, NotCommitted, Redundant, Error;
+ Invalid, NotCommitted, Redundant;
@Override
public String toString()
diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java b/accord-core/src/main/java/accord/messages/ReadTxnData.java
index b34e9ae..4e31910 100644
--- a/accord-core/src/main/java/accord/messages/ReadTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadTxnData.java
@@ -199,7 +199,7 @@
if (state == State.PENDING)
{
state = State.OBSOLETE;
- node.reply(replyTo, replyContext, Redundant);
+ node.reply(replyTo, replyContext, Redundant, null);
}
}
@@ -231,18 +231,20 @@
}
@Override
- protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+ protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail)
{
switch (state)
{
case RETURNED:
- throw new IllegalStateException("ReadOk was sent, yet ack called again");
+ throw new IllegalStateException("ReadOk was sent, yet ack called again", fail);
case OBSOLETE:
logger.debug("After the read completed for txn {}, the result was marked obsolete", txnId);
+ if (fail != null)
+ node.agent().onUncaughtException(fail);
break;
case PENDING:
state = State.RETURNED;
- node.reply(replyTo, replyContext, new ReadOk(unavailable, data));
+ node.reply(replyTo, replyContext, fail == null ? new ReadOk(unavailable, data) : null, fail);
break;
default:
throw new AssertionError("Unknown state: " + state);
diff --git a/accord-core/src/main/java/accord/messages/Reply.java b/accord-core/src/main/java/accord/messages/Reply.java
index e82c4f9..e1fc7e0 100644
--- a/accord-core/src/main/java/accord/messages/Reply.java
+++ b/accord-core/src/main/java/accord/messages/Reply.java
@@ -18,7 +18,28 @@
package accord.messages;
+import javax.annotation.Nonnull;
+
+import static accord.messages.MessageType.FAILURE_RSP;
+
public interface Reply extends Message
{
default boolean isFinal() { return true; }
+
+ class FailureReply implements Reply
+ {
+ @Nonnull
+ public final Throwable failure;
+
+ public FailureReply(@Nonnull Throwable failure)
+ {
+ this.failure = failure;
+ }
+
+ @Override
+ public MessageType type()
+ {
+ return FAILURE_RSP;
+ }
+ }
}
diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
index 5ec41e0..ca52b69 100644
--- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java
+++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
@@ -20,16 +20,21 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import accord.local.*;
-import accord.local.Node.Id;
-import accord.primitives.*;
-import accord.utils.MapReduceConsume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
-
+import accord.local.Command;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.Participants;
+import accord.primitives.TxnId;
import accord.topology.Topology;
+import accord.utils.MapReduceConsume;
+
+import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
public class WaitOnCommit implements Request, MapReduceConsume<SafeCommandStore, Void>, PreLoadContext, Command.TransientListener
{
@@ -141,13 +146,30 @@
@Override
public void accept(Void result, Throwable failure)
{
- ack();
+ if (failure != null)
+ {
+ while (true)
+ {
+ int initialValue = waitingOnUpdater.get(this);
+ if (initialValue == -1)
+ {
+ node.agent().onUncaughtException(new IllegalStateException("Had error in WaitOnCommit, but already replied so can't send failure response", failure));
+ break;
+ }
+ if (waitingOnUpdater.compareAndSet(this, initialValue, -1))
+ node.reply(replyTo, replyContext, null, failure);
+ }
+ }
+ else
+ {
+ ack();
+ }
}
private void ack()
{
if (waitingOnUpdater.decrementAndGet(this) == -1)
- node.reply(replyTo, replyContext, WaitOnCommitOk.INSTANCE);
+ node.reply(replyTo, replyContext, WaitOnCommitOk.INSTANCE, null);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index 06905dd..682c33b 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -193,7 +193,7 @@
return;
isInvalid = true;
- node.reply(replyTo, replyContext, Invalid);
+ node.reply(replyTo, replyContext, Invalid, null);
}
synchronized void sendTruncated()
@@ -202,7 +202,7 @@
return;
isInvalid = true;
- node.reply(replyTo, replyContext, Redundant);
+ node.reply(replyTo, replyContext, Redundant, null);
}
void applied(SafeCommandStore safeStore, SafeCommand safeCommand)
@@ -223,12 +223,13 @@
}
@Override
- protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+ protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail)
{
if (isInvalid)
return;
- node.reply(replyTo, replyContext, new ReadOk(unavailable, data));
+ // data can be null so send the failure response if a failure is present
+ node.reply(replyTo, replyContext, fail == null ? new ReadOk(unavailable, data) : null, fail);
}
private void removeListener(SafeCommandStore safeStore, TxnId txnId)
diff --git a/accord-core/src/main/java/accord/utils/Utils.java b/accord-core/src/main/java/accord/utils/Utils.java
index 3ce0cf1..696edb9 100644
--- a/accord-core/src/main/java/accord/utils/Utils.java
+++ b/accord-core/src/main/java/accord/utils/Utils.java
@@ -18,13 +18,23 @@
package accord.utils;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.IntFunction;
+
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
-import java.util.*;
-import java.util.function.IntFunction;
-
// TODO (low priority): remove when jdk8 support is dropped
public class Utils
{
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index dfa7fe0..5773e68 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -18,9 +18,6 @@
package accord.utils.async;
-import accord.api.VisibleForImplementation;
-import accord.utils.Invariants;
-
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -28,6 +25,9 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
+import accord.api.VisibleForImplementation;
+import accord.utils.Invariants;
+
public class AsyncResults
{
public static final AsyncResult<Void> SUCCESS_VOID = success(null);
@@ -104,7 +104,7 @@
}
}
- boolean trySetResult(V result, Throwable failure)
+ protected boolean trySetResult(V result, Throwable failure)
{
return trySetResult(new Result<>(result, failure));
}
@@ -323,7 +323,7 @@
@VisibleForImplementation
public static class RunnableResult<V> extends AbstractResult<V> implements Runnable
{
- private final Callable<V> callable;
+ protected final Callable<V> callable;
public RunnableResult(Callable<V> callable)
{
@@ -333,14 +333,19 @@
@Override
public void run()
{
+ // There are two different type of exceptions: user function throws, listener throws. To make sure this is clear,
+ // make sure to catch the exception from the user function and set as failed, and let the listener exceptions bubble up.
+ V call;
try
{
- trySetResult(callable.call(), null);
+ call = callable.call();
}
catch (Throwable t)
{
trySetResult(null, t);
+ return;
}
+ trySetResult(call, null);
}
}
diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 4c9ce6c..01ea96d 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -18,16 +18,6 @@
package accord.burn;
-import accord.api.TestableConfigurationService;
-import accord.local.AgentExecutor;
-import accord.impl.AbstractConfigurationService;
-import accord.primitives.Ranges;
-import accord.utils.RandomSource;
-import accord.local.Node;
-import accord.messages.*;
-import accord.topology.Topology;
-import accord.utils.async.AsyncResults;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -35,6 +25,20 @@
import java.util.function.Function;
import java.util.function.Supplier;
+import accord.api.TestableConfigurationService;
+import accord.impl.AbstractConfigurationService;
+import accord.local.AgentExecutor;
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.Reply;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.primitives.Ranges;
+import accord.topology.Topology;
+import accord.utils.RandomSource;
+import accord.utils.async.AsyncResults;
+
public class BurnTestConfigurationService extends AbstractConfigurationService.Minimal implements TestableConfigurationService
{
private final AgentExecutor executor;
@@ -66,7 +70,7 @@
public void process(Node on, Node.Id from, ReplyContext replyContext)
{
Topology topology = on.configService().getTopologyForEpoch(epoch);
- on.reply(from, replyContext, new FetchTopologyReply(topology));
+ on.reply(from, replyContext, new FetchTopologyReply(topology), null);
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index 20f1705..e14fb52 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -23,16 +23,17 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import accord.api.MessageSink;
import accord.local.AgentExecutor;
-import accord.messages.SafeCallback;
-import accord.utils.RandomSource;
import accord.local.Node;
import accord.local.Node.Id;
-import accord.api.MessageSink;
import accord.messages.Callback;
import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
import accord.messages.Request;
+import accord.messages.SafeCallback;
+import accord.utils.RandomSource;
import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
@@ -82,4 +83,10 @@
{
parent.add(self, replyToNode, Packet.getMessageId(replyContext), reply);
}
+
+ @Override
+ public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
+ {
+ reply(replyingToNode, replyContext, new FailureReply(failure));
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
index 63fbae8..2f3bd1b 100644
--- a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -98,7 +98,7 @@
// run without setting the result
try
{
- fn.call();
+ callable.call();
long nowMillis = pending.nowInMillis();
if (periodMillis > 0)
{
@@ -117,7 +117,7 @@
}
catch (Throwable t)
{
- setFailure(t);
+ trySetResult(null, t);
}
}
}
diff --git a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
index 059bc63..0e73309 100644
--- a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
@@ -32,26 +32,11 @@
public abstract class TaskExecutorService extends AbstractExecutorService implements AgentExecutor
{
- public static class Task<T> extends AsyncResults.SettableResult<T> implements Pending, RunnableFuture<T>
+ public static class Task<T> extends AsyncResults.RunnableResult<T> implements Pending, RunnableFuture<T>
{
- protected final Callable<T> fn;
-
public Task(Callable<T> fn)
{
- this.fn = fn;
- }
-
- @Override
- public void run()
- {
- try
- {
- setSuccess(fn.call());
- }
- catch (Throwable t)
- {
- setFailure(t);
- }
+ super(fn);
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index a0b55d3..5fb3924 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -20,14 +20,20 @@
import java.util.function.Consumer;
+import accord.api.Agent;
+import accord.api.Result;
import accord.impl.mock.Network;
import accord.local.Command;
import accord.local.Node;
-import accord.api.Agent;
-import accord.api.Result;
-import accord.primitives.*;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
import static accord.local.Node.Id.NONE;
+import static accord.utils.Invariants.checkState;
import static com.google.common.base.Functions.identity;
public class ListAgent implements Agent
@@ -46,11 +52,16 @@
@Override
public void onRecover(Node node, Result success, Throwable fail)
{
+ if (fail != null)
+ {
+ checkState(success == null, "fail (%s) and success (%s) are both not null", fail, success);
+ // We don't really process errors for Recover here even though it is provided in the interface
+ }
if (success != null)
{
ListResult result = (ListResult) success;
if (result.requestId > Integer.MIN_VALUE)
- node.reply(result.client, Network.replyCtxFor(result.requestId), result);
+ node.reply(result.client, Network.replyCtxFor(result.requestId), result, null);
}
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index b0874c6..c6127a3 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -35,9 +35,9 @@
import accord.messages.CheckStatus.IncludeInfo;
import accord.messages.MessageType;
import accord.messages.ReplyContext;
+import accord.messages.Request;
import accord.primitives.RoutingKeys;
import accord.primitives.Txn;
-import accord.messages.Request;
import accord.primitives.TxnId;
import static accord.local.Status.Phase.Cleanup;
@@ -107,41 +107,36 @@
@Override
public void accept(Result success, Throwable fail)
{
- // TODO (desired, testing): error handling
- if (success != null)
- {
- node.reply(client, replyContext, (ListResult) success);
- }
- else if (fail instanceof CoordinationFailed)
+ if (fail instanceof CoordinationFailed)
{
RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
TxnId txnId = ((CoordinationFailed) fail).txnId();
if (fail instanceof Invalidated)
{
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null));
+ node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null);
return;
}
- node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null));
+ node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null), null);
((Cluster)node.scheduler()).onDone(() -> {
node.commandStores()
.select(homeKey)
.execute(() -> CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f) -> {
if (f != null)
{
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f instanceof Truncated ? new int[2][] : new int[3][], null));
+ node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f instanceof Truncated ? new int[2][] : new int[3][], null), null);
return;
}
switch (s)
{
case Truncated:
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[2][], null));
+ node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[2][], null), null);
break;
case Invalidated:
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null));
+ node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null);
break;
case Lost:
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[1][], null));
+ node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[1][], null), null);
break;
case Other:
// currently caught elsewhere in response tracking, but might help to throw an exception here
@@ -149,6 +144,10 @@
}));
});
}
+ else
+ {
+ node.reply(client, replyContext, (ListResult) success, fail);
+ }
}
}
diff --git a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
index e33314d..ba0502e 100644
--- a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
+++ b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
@@ -18,11 +18,13 @@
package accord.impl.mock;
+import accord.api.MessageSink;
import accord.local.AgentExecutor;
import accord.local.Node;
-import accord.api.MessageSink;
+import accord.local.Node.Id;
import accord.messages.Callback;
import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
import accord.messages.Request;
@@ -54,4 +56,10 @@
{
network.reply(node, replyingToNode, Network.getMessageId(replyContext), reply);
}
+
+ @Override
+ public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
+ {
+ network.reply(node, replyingToNode, Network.getMessageId(replyContext), new FailureReply(failure));
+ }
}
diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java b/accord-core/src/test/java/accord/utils/MessageTask.java
index d1064e3..528a6e4 100644
--- a/accord-core/src/test/java/accord/utils/MessageTask.java
+++ b/accord-core/src/test/java/accord/utils/MessageTask.java
@@ -18,16 +18,25 @@
package accord.utils;
-import accord.local.AgentExecutor;
-import accord.local.Node;
-import accord.messages.*;
-import accord.utils.async.AsyncResults;
-import com.google.common.collect.ImmutableList;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import com.google.common.collect.ImmutableList;
+
+import accord.local.AgentExecutor;
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.Reply;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.utils.async.AsyncResults;
+
/**
* Message task that will continue sending messages to a set of nodes until all
* nodes ack the message.
@@ -88,7 +97,7 @@
@Override
public void process(Node on, Node.Id from, ReplyContext replyContext)
{
- process.process(on, from, success -> on.reply(from, replyContext, success ? SUCCESS : FAILURE));
+ process.process(on, from, success -> on.reply(from, replyContext, success ? SUCCESS : FAILURE, null));
}
@Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 51da1b3..cd7741b 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -51,6 +51,7 @@
import accord.local.ShardDistributor;
import accord.messages.Callback;
import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
import accord.messages.Request;
import accord.messages.SafeCallback;
@@ -120,6 +121,12 @@
long replyToMessage = ((Packet) replyContext).body.msg_id;
parent.add(self, replyToNode, replyToMessage, reply);
}
+
+ @Override
+ public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
+ {
+ reply(replyingToNode, replyContext, new FailureReply(failure));
+ }
}
final Function<Id, Node> lookup;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
index d069118..dad08b0 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
@@ -22,8 +22,8 @@
import accord.maelstrom.Packet.Type;
import accord.messages.MessageType;
-import com.google.gson.stream.JsonWriter;
import accord.messages.Reply;
+import com.google.gson.stream.JsonWriter;
public class Error extends Body implements Reply
{
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index 34f34d1..30f2a0f 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -18,13 +18,20 @@
package accord.maelstrom;
-import accord.local.Command;
-import accord.local.Node;
+import java.util.concurrent.TimeUnit;
+
import accord.api.Agent;
import accord.api.Result;
-import accord.primitives.*;
+import accord.local.Command;
+import accord.local.Node;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
-import java.util.concurrent.TimeUnit;
+import static accord.utils.Invariants.checkState;
public class MaelstromAgent implements Agent
{
@@ -33,10 +40,15 @@
@Override
public void onRecover(Node node, Result success, Throwable fail)
{
+ if (fail != null)
+ {
+ checkState(success == null, "fail (%s) and success (%s) are both not null", fail, success);
+ // We don't really process errors for Recover here even though it is provided in the interface
+ }
if (success != null)
{
MaelstromResult result = (MaelstromResult) success;
- node.reply(result.client, MaelstromReplyContext.contextFor(result.requestId), new MaelstromReply(result.requestId, result));
+ node.reply(result.client, MaelstromReplyContext.contextFor(result.requestId), new MaelstromReply(result.requestId, result), null);
}
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
index e81f539..f9aad09 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
@@ -23,17 +23,18 @@
import java.util.TreeSet;
import accord.api.Key;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.maelstrom.Packet.Type;
import accord.messages.MessageType;
+import accord.messages.Reply;
import accord.messages.ReplyContext;
+import accord.messages.Request;
import accord.primitives.Keys;
+import accord.primitives.Txn;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.primitives.Txn;
-import accord.maelstrom.Packet.Type;
-import accord.messages.Request;
public class MaelstromRequest extends Body implements Request
{
@@ -49,8 +50,8 @@
public void process(Node node, Id client, ReplyContext replyContext)
{
node.coordinate(txn).addCallback((success, fail) -> {
- if (success != null) node.reply(client, replyContext, new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success));
-// else node.reply(client, messageId, new Error(messageId, 13, fail.getMessage()));
+ Reply reply = success != null ? new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success) : null;
+ node.reply(client, replyContext, reply, fail);
});
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 0d96058..1600e03 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -44,6 +44,7 @@
import accord.maelstrom.Packet.Type;
import accord.messages.Callback;
import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
import accord.messages.Request;
import accord.topology.Topology;
@@ -129,6 +130,12 @@
{
send(new Packet(self, replyToNode, MaelstromReplyContext.messageIdFor(replyContext), reply));
}
+
+ @Override
+ public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
+ {
+ reply(replyingToNode, replyContext, new FailureReply(failure));
+ }
}
public static void listen(TopologyFactory topologyFactory, InputStream stdin, PrintStream out, PrintStream err) throws IOException