filter and record faulty in AbstractCoordination
diff --git a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
index dcb6059..6b7fe39 100644
--- a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
@@ -28,7 +28,6 @@
import accord.primitives.FullRoute;
import accord.primitives.TxnId;
import accord.topology.Topologies;
-import accord.utils.SortedList;
import static accord.api.ProtocolModifiers.QuorumEpochIntersections;
import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
@@ -75,10 +74,4 @@
{
return CoordinationKind.PreAccept;
}
-
- @Override
- public SortedList<Id> nodes()
- {
- return topologies.nodes();
- }
}
diff --git a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
index d5e2da6..1d92fc7 100644
--- a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
+++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
@@ -29,6 +29,7 @@
import accord.api.Tracing;
import accord.coordinate.tracking.AbstractTracker;
+import accord.coordinate.tracking.RequestStatus;
import accord.local.Node;
import accord.local.SequentialAsyncExecutor;
import accord.messages.Callback;
@@ -36,6 +37,7 @@
import accord.primitives.Participants;
import accord.primitives.Route;
import accord.primitives.TxnId;
+import accord.topology.Topologies;
import accord.utils.DebugMap;
import accord.utils.Invariants;
import accord.utils.SimpleBitSet;
@@ -73,6 +75,8 @@
abstract void onSuccessInternal(Node.Id from, int fromIndex, Reply reply);
abstract void onFailureInternal(Node.Id from, int fromIndex, Throwable fail);
void onSlowResponseInternal(Node.Id from) {}
+ public abstract @Nonnull AbstractTracker<?> tracker();
+ public SortedList<Node.Id> nodes() { return nodes; }
void recordOk(int fromIndex, Ok ok)
{
@@ -171,15 +175,28 @@
void contact(Function<Node.Id, Request> request, @Nullable Predicate<Node.Id> include)
{
executor.executeMaybeImmediately(() -> {
+ AbstractTracker<?> tracker = tracker();
+ Topologies topologies = tracker.topologies();
for (int i = 0; i < nodes.size() ; ++i)
{
Node.Id to = nodes.get(i);
if (include == null || include.test(to))
{
- Invariants.require(replyState[i] == null);
- expectingReply.set(i);
- replyState[i] = node.send(to, request.apply(to), executor, this);
- Invariants.require(expectingReply.get(i) || replyState[i] == null);
+ if (topologies.isFaulty(to))
+ {
+ if (RequestStatus.Failed == tracker.prerecordFailure(to))
+ {
+ finishOnExaustion();
+ return;
+ }
+ }
+ else
+ {
+ Invariants.require(replyState[i] == null);
+ expectingReply.set(i);
+ replyState[i] = node.send(to, request.apply(to), executor, this);
+ Invariants.require(expectingReply.get(i) || replyState[i] == null);
+ }
}
}
});
@@ -340,7 +357,7 @@
return kind().name() + ':' + txnId
+ " scope:" + scope()
+ " inflight:" + inflight()
- + (tracker == null ? "" : " tracker:" + tracker.summariseTracker())
+ + " tracker:" + tracker.summariseTracker()
+ (describe.isEmpty() ? "" : ' ' + describe)
+ (replies == null ? "" : " replies:" + summariseReplies(replies, 60));
}
diff --git a/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java b/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java
index c967c16..8ca9d93 100644
--- a/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java
+++ b/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java
@@ -24,6 +24,7 @@
import javax.annotation.Nullable;
+import accord.coordinate.tracking.AbstractTracker;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node;
import accord.local.Node.Id;
@@ -39,8 +40,6 @@
import accord.primitives.Unseekables;
import accord.topology.Topologies;
import accord.utils.Invariants;
-import accord.utils.SortedArrays.SortedArrayList;
-import accord.utils.SortedList;
import accord.utils.SortedListMap;
import static accord.coordinate.tracking.RequestStatus.Failed;
@@ -73,16 +72,8 @@
@Override
void start()
{
- SortedArrayList<Id> contact = tracker.filterAndRecordFaulty();
- if (contact == null)
- {
- finishOnExaustion();
- }
- else
- {
- super.start();
- contact(to -> new GetLatestDeps(to, tracker.topologies(), scope, txnId, ballot, executeAt));
- }
+ super.start();
+ contact(to -> new GetLatestDeps(to, tracker.topologies(), scope, txnId, ballot, executeAt));
}
@Override
@@ -125,8 +116,8 @@
}
@Override
- public SortedList<Id> nodes()
+ public AbstractTracker<?> tracker()
{
- return tracker.nodes();
+ return tracker;
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/Coordination.java b/accord-core/src/main/java/accord/coordinate/Coordination.java
index 51c413f..469c134 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordination.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordination.java
@@ -18,6 +18,7 @@
package accord.coordinate;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.coordinate.tracking.AbstractTracker;
@@ -55,20 +56,14 @@
}
long coordinationId();
-
TxnId txnId();
CoordinationKind kind();
Participants<?> scope();
+ @Nullable AbstractTracker<?> tracker();
+ @Nullable SortedList<Id> nodes();
default @Nullable Ballot ballot() { return null; }
- default SortedList<Id> nodes()
- {
- AbstractTracker<?> tracker = tracker();
- return tracker == null ? null : tracker.nodes();
- }
- default @Nullable AbstractTracker<?> tracker() { return null; }
-
default String describe() { return ""; }
default @Nullable SortedList<Id> inflight() { return null; }
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index c496212..805bb83 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -139,16 +139,11 @@
void start()
{
node.agent().coordinatorEvents().onExecuting(syncPoint.syncId, null, syncPoint.waitFor, null);
- SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
// TODO (desired): special Apply message that doesn't resend deps if path=MEDIUM
Txn txn = node.agent().emptySystemTxn(syncPoint.syncId.kind(), syncPoint.syncId.domain());
Result result = txn.result(syncPoint.syncId, syncPoint.executeAt, null);
- if (contact == null) finishOnExaustion();
- else
- {
- super.start();
- contact(to -> new ApplyThenWaitUntilApplied(to, tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, null, result));
- }
+ super.start();
+ contact(to -> new ApplyThenWaitUntilApplied(to, tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, null, result));
}
@Override
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index 6e4f134..c26ed34 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -94,16 +94,8 @@
@Override
void start()
{
- SortedArrays.SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
- if (contact == null)
- {
- finishOnExaustion();
- }
- else
- {
- super.start();
- contact(to -> new BeginInvalidation(to, tracker.topologies(), txnId, scope, ballot));
- }
+ super.start();
+ contact(to -> new BeginInvalidation(to, tracker.topologies(), txnId, scope, ballot));
}
@Override
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java b/accord-core/src/main/java/accord/coordinate/Persist.java
index 80edbc9..cd57a08 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -132,16 +132,8 @@
node.agent().coordinatorEvents().onExecuted(txnId, ballot);
// applyMinimal is used for transaction execution by the original coordinator so it's important to use
// Node's Apply factory in case the factory has to do synchronous Apply.
- SortedArrays.SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
- if (contact == null)
- {
- finishOnExaustion();
- }
- else
- {
- super.start();
- contact(to -> factory.create(applyKind, to, tracker.topologies(), txnId, ballot, sendTo, txn, executeAt, stableDeps, writes, result, scope, flags.get(to)));
- }
+ super.start();
+ contact(to -> factory.create(applyKind, to, tracker.topologies(), txnId, ballot, sendTo, txn, executeAt, stableDeps, writes, result, scope, flags.get(to)));
}
@Override
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java
index 1e0aec0..8d05206 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -86,16 +86,8 @@
@Override
void start()
{
- SortedArrays.SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
- if (contact == null)
- {
- finishOnExaustion();
- }
- else
- {
- super.start();
- contact(to -> new Accept(to, tracker.topologies(), kind, ballot, txnId, scope, executeAt, deps, require != scope));
- }
+ super.start();
+ contact(to -> new Accept(to, tracker.topologies(), kind, ballot, txnId, scope, executeAt, deps, require != scope));
}
@Override
diff --git a/accord-core/src/main/java/accord/coordinate/Stabilise.java b/accord-core/src/main/java/accord/coordinate/Stabilise.java
index cb3c52a..925d44d 100644
--- a/accord-core/src/main/java/accord/coordinate/Stabilise.java
+++ b/accord-core/src/main/java/accord/coordinate/Stabilise.java
@@ -74,18 +74,13 @@
@Override
void start()
{
- SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
+ super.start();
+ contact(to -> new Commit(CommitSlowPath, to, allTopologies, txnId, txn, scope, ballot, executeAt, stabiliseDeps));
if (allTopologies.size() > 1)
- contact = contact.with(allTopologies.nodes().without(tracker.nodes()).without(allTopologies::isFaulty));
-
- if (contact == null)
{
- finishOnExaustion();
- }
- else
- {
- super.start();
- contact(to -> new Commit(CommitSlowPath, to, allTopologies, txnId, txn, scope, ballot, executeAt, stabiliseDeps));
+ SortedArrayList<Node.Id> extra = allTopologies.nodes().without(tracker.nodes()).without(allTopologies::isFaulty);
+ for (Node.Id to : extra)
+ node.send(to, new Commit(CommitSlowPath, to, allTopologies, txnId, txn, scope, ballot, executeAt, stabiliseDeps));
}
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
index e9c9731..6f011c8 100644
--- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
@@ -20,8 +20,11 @@
import java.util.function.Function;
+import javax.annotation.Nullable;
+
import accord.api.Data;
import accord.api.DataStore;
+import accord.coordinate.tracking.AbstractTracker;
import accord.impl.AbstractFetchCoordinator;
import accord.local.CommandStore;
import accord.local.Node;
@@ -74,6 +77,13 @@
return new ListFetchRequest(sourceEpoch, syncId, ranges, partialDeps, partialTxn);
}
+ @Nullable
+ @Override
+ public AbstractTracker<?> tracker()
+ {
+ return null;
+ }
+
static class ListFetchRequest extends FetchRequest
{
public ListFetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn)