Accord support for live migration
Patch by Ariel Weisberg; Reviewed by Blake Eggleston for CASSANDRA-18129
diff --git a/accord-core/build.gradle b/accord-core/build.gradle
index 46b2930..877bac6 100644
--- a/accord-core/build.gradle
+++ b/accord-core/build.gradle
@@ -52,6 +52,7 @@
exclude group: 'org.slf4j', module: 'log4j-over-slf4j'
exclude group: 'org.slf4j', module: 'jcl-over-slf4j'
}
+ testImplementation group: 'org.awaitility', name: 'awaitility', version: '4.2.0'
}
task burn(type: JavaExec) {
diff --git a/accord-core/src/main/java/accord/api/Agent.java b/accord-core/src/main/java/accord/api/Agent.java
index ff8ab04..4a4460d 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -18,6 +18,8 @@
package accord.api;
+import javax.annotation.Nonnull;
+
import accord.local.Command;
import accord.local.Node;
import accord.primitives.Ranges;
@@ -52,6 +54,16 @@
void onFailedBootstrap(String phase, Ranges ranges, Runnable retry, Throwable failure);
+ /**
+ * Invoked with the keys (but not ranges) that have all dependent transactions in the applied
+ * state at this node as of some TxnId. No guarantees are made about other nodes.
+ *
+ * Useful for migrations to/from Accord where you want to know there are no in flight
+ * transactions in Accord that might still execute, and that it is safe to read
+ * outside of Accord.
+ */
+ default void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull Timestamp executeAt) {}
+
@Override
void onUncaughtException(Throwable t);
diff --git a/accord-core/src/main/java/accord/api/BarrierType.java b/accord-core/src/main/java/accord/api/BarrierType.java
new file mode 100644
index 0000000..864a41c
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/BarrierType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.api;
+
+public enum BarrierType
+{
+ // Only wait until the barrier is achieved locally, and possibly don't trigger the barrier remotely.
+ // Local barriers are only on the `minEpoch` provided and have utility limited to establishing
+ // no more transactions will occur in an earlier before minEpoch
+ local(false, true),
+ // Wait until the barrier has been achieved at a quorum globally
+ global_sync(true, false),
+ // Trigger the global barrier, but only block on creation of the barrier and local application
+ global_async(true, true);
+
+ public final boolean global;
+ public final boolean async;
+
+ BarrierType(boolean global, boolean async)
+ {
+ this.global = global;
+ this.async = async;
+ }
+}
diff --git a/accord-core/src/main/java/accord/api/Query.java b/accord-core/src/main/java/accord/api/Query.java
index af495e6..f215d69 100644
--- a/accord-core/src/main/java/accord/api/Query.java
+++ b/accord-core/src/main/java/accord/api/Query.java
@@ -18,11 +18,13 @@
package accord.api;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
-import javax.annotation.Nullable;
-
/**
* The computational/transformation part of a client query
*/
@@ -31,6 +33,9 @@
/**
* Perform some transformation on the complete {@link Data} result of a {@link Read}
* from some {@link DataStore}, to produce a {@link Result} to return to the client.
+ *
+ * executeAt timestamp is provided so that the result of the transaction can be determined based
+ * on configuration at that epoch. This will be deterministic even if the transaction is recovered.
*/
- Result compute(TxnId txnId, Timestamp executeAt, @Nullable Data data, @Nullable Read read, @Nullable Update update);
+ Result compute(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Seekables<?, ?> keys, @Nullable Data data, @Nullable Read read, @Nullable Update update);
}
diff --git a/accord-core/src/main/java/accord/coordinate/Barrier.java b/accord-core/src/main/java/accord/coordinate/Barrier.java
new file mode 100644
index 0000000..39e56de
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Barrier.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nonnull;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.BarrierType;
+import accord.api.Key;
+import accord.api.RoutingKey;
+import accord.local.Command;
+import accord.local.Node;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SafeCommandStore.TestDep;
+import accord.local.SafeCommandStore.TestKind;
+import accord.local.SafeCommandStore.TestTimestamp;
+import accord.local.Status;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.MapReduceConsume;
+import accord.utils.async.AsyncResults;
+
+import static accord.local.PreLoadContext.contextFor;
+import static accord.utils.Invariants.checkArgument;
+import static accord.utils.Invariants.checkState;
+import static accord.utils.async.AsyncChains.getUninterruptibly;
+
+/**
+ * Local or global barriers that return a result once all transactions have their side effects visible.
+ *
+ * For local barriers the epoch is the only guarantee, but for global barriers a new transaction is created so it will
+ * be as of the timestamp of the created transaction.
+ *
+ * Note that reads might still order after, but side effect bearing transactions should not.
+ */
+public class Barrier<S extends Seekables<?, ?>> extends AsyncResults.AbstractResult<Timestamp>
+{
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(Barrier.class);
+
+ private final Node node;
+
+ private final S seekables;
+
+ private final long minEpoch;
+ private final BarrierType barrierType;
+
+ @VisibleForTesting
+ CoordinateSyncPoint<S> coordinateSyncPoint;
+ @VisibleForTesting
+ ExistingTransactionCheck existingTransactionCheck;
+
+ Barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType)
+ {
+ checkArgument(keysOrRanges.domain() == Domain.Key || barrierType.global, "Ranges are only supported with global barriers");
+ checkArgument(keysOrRanges.size() == 1 || barrierType.global, "Only a single key is supported with local barriers");
+ this.node = node;
+ this.minEpoch = minEpoch;
+ this.seekables = keysOrRanges;
+ this.barrierType = barrierType;
+ }
+
+ public static <S extends Seekables<?, ?>> Barrier<S> barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType)
+ {
+ Barrier<S> barrier = new Barrier(node, keysOrRanges, minEpoch, barrierType);
+ barrier.start();
+ return barrier;
+ }
+
+ private void start()
+ {
+ // It may be possible to use local state to determine that the barrier is already satisfied or
+ // there is an existing transaction we can wait on
+ if (!barrierType.global)
+ {
+ existingTransactionCheck = checkForExistingTransaction();
+ existingTransactionCheck.addCallback((barrierTxn, existingTransactionCheckFailure) -> {
+ if (existingTransactionCheckFailure != null)
+ {
+ Barrier.this.tryFailure(existingTransactionCheckFailure);
+ return;
+ }
+
+ if (barrierTxn != null)
+ {
+ if (barrierTxn.status.equals(Status.Applied))
+ {
+ doBarrierSuccess(barrierTxn.executeAt);
+ }
+ // A listener was added to the transaction already
+ }
+ else
+ {
+ createSyncPoint();
+ }
+ });
+ }
+ else
+ {
+ createSyncPoint();
+ }
+ }
+
+ private void doBarrierSuccess(Timestamp executeAt)
+ {
+ // The transaction we wait on might have more keys, but it's not
+ // guaranteed they were wanted and we don't want to force the agent to filter them
+ // so provide the seekables we were given.
+ // We also don't notify the agent for range barriers since that makes sense as a local concept
+ // since ranges can easily span multiple nodes
+ if (seekables != null)
+ node.agent().onLocalBarrier(seekables, executeAt);
+ Barrier.this.trySuccess(executeAt);
+ }
+
+ private void createSyncPoint()
+ {
+ try
+ {
+ coordinateSyncPoint = getUninterruptibly(CoordinateSyncPoint.inclusive(node, seekables, barrierType.async));
+ }
+ catch (ExecutionException e)
+ {
+ tryFailure(e);
+ }
+ coordinateSyncPoint.addCallback((syncPoint, syncPointFailure) -> {
+ if (syncPointFailure != null)
+ {
+ Barrier.this.tryFailure(syncPointFailure);
+ return;
+ }
+
+ // Need to wait for the local transaction to finish since coordinate sync point won't wait on anything
+ // if async was requested or there were no deps found
+ if (syncPoint.finishedAsync)
+ {
+ TxnId txnId = syncPoint.syncId;
+ long epoch = txnId.epoch();
+ RoutingKey homeKey = syncPoint.homeKey;
+ node.commandStores().ifLocal(contextFor(txnId), homeKey, epoch, epoch,
+ safeStore -> safeStore.get(txnId, homeKey).addAndInvokeListener(safeStore, new BarrierCommandListener()))
+ .begin(node.agent());
+ }
+ else
+ {
+ doBarrierSuccess(syncPoint.syncId);
+ }
+ });
+ }
+
+ private class BarrierCommandListener implements Command.TransientListener
+ {
+ @Override
+ public void onChange(SafeCommandStore safeStore, SafeCommand safeCommand)
+ {
+ Command command = safeCommand.current();
+ if (command.is(Status.Applied))
+ {
+ Timestamp executeAt = command.executeAt();
+ // In all the cases where we add a listener (listening to existing command, async completion of CoordinateSyncPoint)
+ // we want to notify the agent
+ doBarrierSuccess(executeAt);
+ }
+ else if (command.hasBeen(Status.Truncated))
+ // Surface the invalidation/truncation and the barrier can be retried by the caller
+ Barrier.this.tryFailure(new Timeout(command.txnId(), command.homeKey()));
+
+ }
+
+ @Override
+ public PreLoadContext listenerPreLoadContext(TxnId caller)
+ {
+ return PreLoadContext.contextFor(caller);
+ }
+ }
+
+ private ExistingTransactionCheck checkForExistingTransaction()
+ {
+ checkState(seekables.size() == 1 && seekables.domain() == Domain.Key);
+ ExistingTransactionCheck check = new ExistingTransactionCheck();
+ Key k = seekables.get(0).asKey();
+ node.commandStores().mapReduceConsume(
+ contextFor(k),
+ k.toUnseekable(),
+ minEpoch,
+ Long.MAX_VALUE,
+ check);
+ return check;
+ }
+
+ // Hold result of looking for a transaction to act as a barrier for an Epoch
+ static class BarrierTxn
+ {
+ @Nonnull
+ public final TxnId txnId;
+ @Nonnull
+ public final Timestamp executeAt;
+ @Nonnull
+ public final Status status;
+ @Nonnull
+ public final Key key;
+ public BarrierTxn(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Status status, Key key)
+ {
+ this.txnId = txnId;
+ this.executeAt = executeAt;
+ this.status = status;
+ this.key = key;
+ }
+
+ public BarrierTxn max(BarrierTxn other)
+ {
+ if (other == null)
+ return this;
+ return status.compareTo(other.status) >= 0 ? this : other;
+ }
+ }
+
+ /*
+ * Check for an existing transaction that is either already Applied (dependencies were applied)
+ * or PreApplied (outcome fixed, dependencies not yet applied).
+ *
+ * For Applied we can return success immediately with the executeAt epoch. For PreApplied we can add
+ * a listener for when it transitions to Applied and then return success.
+ */
+ class ExistingTransactionCheck extends AsyncResults.AbstractResult<BarrierTxn> implements MapReduceConsume<SafeCommandStore, BarrierTxn>
+ {
+ @Override
+ public BarrierTxn apply(SafeCommandStore safeStore)
+ {
+ BarrierTxn found = safeStore.mapReduceWithTerminate(
+ seekables,
+ safeStore.ranges().allAfter(minEpoch),
+ // Barriers are trying to establish that committed transactions are applied before the barrier (or in this case just minEpoch)
+ // so all existing transaction types should ensure that at this point. An earlier txnid may have an executeAt that is after
+ // this barrier or the transaction we listen on and that is fine
+ TestKind.Any,
+ TestTimestamp.EXECUTES_AFTER,
+ TxnId.minForEpoch(minEpoch),
+ TestDep.ANY_DEPS,
+ null,
+ Status.Committed,
+ Status.Applied,
+ (keyOrRange, txnId, executeAt, status, barrierTxn) -> {
+ if (keyOrRange.domain() == Domain.Key)
+ return new BarrierTxn(txnId, executeAt, status, keyOrRange.asKey());
+ return null;
+ },
+ null,
+ // Take the first one we find, and call it good enough to wait on
+ barrierTxn -> barrierTxn != null);
+ // It's not applied so add a listener to find out when it is applied
+ if (found != null && !found.status.equals(Status.Applied))
+ {
+ safeStore.commandStore().execute(
+ contextFor(found.txnId),
+ safeStoreWithTxn -> safeStoreWithTxn.get(found.txnId, found.key.toUnseekable()).addAndInvokeListener(safeStore, new BarrierCommandListener())
+ ).begin(node.agent());
+ }
+ return found;
+ }
+
+ @Override
+ public BarrierTxn reduce(BarrierTxn o1, BarrierTxn o2)
+ {
+ throw new IllegalStateException("Should not be possible to find multiple transactions");
+ }
+
+ @Override
+ public void accept(BarrierTxn result, Throwable failure)
+ {
+ if (failure != null)
+ {
+ ExistingTransactionCheck.this.tryFailure(failure);
+ return;
+ }
+
+ // Will need to create a transaction
+ ExistingTransactionCheck.this.trySuccess(result);
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java b/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java
new file mode 100644
index 0000000..f8b030c
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.function.BiConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Result;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.coordinate.tracking.RequestStatus;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Callback;
+import accord.messages.Commit;
+import accord.messages.ReadData.ReadNack;
+import accord.messages.ReadData.ReadReply;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.RequestStatus.Failed;
+
+/**
+ * Block on deps at quorum for a sync point transaction, and then move the transaction to the applied state
+ */
+public class BlockOnDeps implements Callback<ReadReply>
+{
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(BlockOnDeps.class);
+
+ final Node node;
+ final TxnId txnId;
+ final Txn txn;
+ final FullRoute<?> route;
+ final Deps deps;
+ final Topologies blockOn;
+ final QuorumTracker tracker;
+ final BiConsumer<? super Result, Throwable> callback;
+
+ private boolean isDone = false;
+
+ private BlockOnDeps(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, BiConsumer<? super Result, Throwable> callback)
+ {
+ this.node = node;
+ this.txnId = txnId;
+ this.txn = txn;
+ this.route = route;
+ this.deps = deps;
+ // Sync points don't propose anything so they can execute at their txnId epoch
+ this.blockOn = node.topology().forEpoch(route, txnId.epoch());
+ this.tracker = new QuorumTracker(blockOn);
+ this.callback = callback;
+ }
+
+ public static void blockOnDeps(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, BiConsumer<? super Result, Throwable> callback)
+ {
+ BlockOnDeps blockOnDeps = new BlockOnDeps(node, txnId, txn, route, deps, callback);
+ blockOnDeps.start();
+ }
+
+ void start()
+ {
+ Commit.commitMaximalAndBlockOnDeps(node, blockOn, txnId, txn, route, deps, this);
+ }
+
+ @Override
+ public void onSuccess(Id from, ReadReply reply)
+ {
+ if (isDone)
+ return;
+
+ if (reply.isOk())
+ {
+ if (tracker.recordSuccess(from) == RequestStatus.Success)
+ {
+ isDone = true;
+ callback.accept(txn.result(txnId, txnId, null), null);
+ }
+ return;
+ }
+
+ ReadNack nack = (ReadNack) reply;
+ switch (nack)
+ {
+ default: throw new IllegalStateException();
+ case Redundant:
+ // WaitUntilApplied only sends Redundant on truncation which implies durable and applied
+ isDone = true;
+ callback.accept(txn.result(txnId, txnId, null), null);
+ break;
+ case NotCommitted:
+ throw new IllegalStateException("Received `NotCommitted` response after sending maximal commit as part of `BlockOnDeps`");
+ case Invalid:
+ onFailure(from, new IllegalStateException("Submitted a read command to a replica that did not own the range"));
+ break;
+ }
+ }
+
+ @Override
+ public void onFailure(Id from, Throwable failure)
+ {
+ if (tracker.recordFailure(from) == Failed)
+ {
+ isDone = true;
+ callback.accept(null, failure);
+ }
+ }
+
+ @Override
+ public void onCallbackFailure(Id from, Throwable failure)
+ {
+ isDone = true;
+ callback.accept(null, failure);
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index 1b5b39b..6632f82 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -25,6 +25,7 @@
import accord.messages.ReadData;
import accord.messages.SetShardDurable;
import accord.messages.WaitUntilApplied;
+import accord.primitives.Ranges;
import accord.primitives.SyncPoint;
import accord.topology.Topologies;
import accord.utils.async.AsyncResult;
@@ -37,18 +38,18 @@
{
final Node node;
final AppliedTracker tracker;
- final SyncPoint exclusiveSyncPoint;
+ final SyncPoint<Ranges> exclusiveSyncPoint;
- private CoordinateShardDurable(Node node, SyncPoint exclusiveSyncPoint)
+ private CoordinateShardDurable(Node node, SyncPoint<Ranges> exclusiveSyncPoint)
{
// TODO (required): this isn't correct, we need to potentially perform a second round if a dependency executes in a future epoch and we have lost ownership of that epoch
- Topologies topologies = node.topology().forEpoch(exclusiveSyncPoint.ranges, exclusiveSyncPoint.sourceEpoch());
+ Topologies topologies = node.topology().forEpoch(exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.sourceEpoch());
this.node = node;
this.tracker = new AppliedTracker(topologies);
this.exclusiveSyncPoint = exclusiveSyncPoint;
}
- public static AsyncResult<Void> coordinate(Node node, SyncPoint exclusiveSyncPoint)
+ public static AsyncResult<Void> coordinate(Node node, SyncPoint<Ranges> exclusiveSyncPoint)
{
CoordinateShardDurable coordinate = new CoordinateShardDurable(node, exclusiveSyncPoint);
coordinate.start();
@@ -57,7 +58,7 @@
private void start()
{
- node.send(tracker.nodes(), to -> new WaitUntilApplied(to, tracker.topologies(), exclusiveSyncPoint.syncId, exclusiveSyncPoint.ranges, exclusiveSyncPoint.syncId), this);
+ node.send(tracker.nodes(), to -> new WaitUntilApplied(to, tracker.topologies(), exclusiveSyncPoint.syncId, exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.syncId), this);
}
@Override
@@ -87,7 +88,7 @@
// TODO (required): we also need to handle ranges not being safe to read
if (tracker.recordSuccess(from) == RequestStatus.Success)
{
- node.configService().reportEpochRedundant(exclusiveSyncPoint.ranges, exclusiveSyncPoint.syncId.epoch());
+ node.configService().reportEpochRedundant(exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.syncId.epoch());
node.send(tracker.nodes(), new SetShardDurable(exclusiveSyncPoint));
trySuccess(null);
}
diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index 6713694..82cf850 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -19,28 +19,32 @@
package accord.coordinate;
import java.util.List;
+import java.util.function.BiConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import accord.local.Node;
import accord.messages.Apply;
import accord.messages.PreAccept.PreAcceptOk;
import accord.primitives.Ballot;
import accord.primitives.Deps;
-import accord.primitives.FullRangeRoute;
import accord.primitives.FullRoute;
-import accord.primitives.Ranges;
+import accord.primitives.Seekables;
import accord.primitives.SyncPoint;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.Txn.Kind;
import accord.primitives.TxnId;
import accord.topology.Topologies;
+import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
-import accord.utils.Invariants;
import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
import static accord.primitives.Timestamp.mergeMax;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.utils.Functions.foldl;
+import static accord.utils.Invariants.checkArgument;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
@@ -48,45 +52,80 @@
*
* TODO (desired, testing): dedicated burn test to validate outcomes
*/
-public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+public class CoordinateSyncPoint<S extends Seekables<?, ?>> extends CoordinatePreAccept<SyncPoint<S>>
{
- final Ranges ranges;
- private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Ranges ranges)
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(CoordinateSyncPoint.class);
+
+ final S keysOrRanges;
+ // Whether to wait on the dependencies applying globally before returning a result
+ final boolean async;
+
+ private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route, S keysOrRanges, boolean async)
{
super(node, txnId, txn, route, node.topology().withOpenEpochs(route, txnId, txnId));
- this.ranges = ranges;
+ checkArgument(txnId.rw() == Kind.SyncPoint || async, "Exclusive sync points only support async application");
+ this.keysOrRanges = keysOrRanges;
+ this.async = async;
}
- public static AsyncResult<SyncPoint> exclusive(Node node, Ranges keysOrRanges)
+ public static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> exclusive(Node node, S keysOrRanges)
{
- return coordinate(node, ExclusiveSyncPoint, keysOrRanges);
+ return coordinate(node, ExclusiveSyncPoint, keysOrRanges, true);
}
- public static AsyncResult<SyncPoint> inclusive(Node node, Ranges keysOrRanges)
+ public static <S extends Seekables<?, ?>> CoordinateSyncPoint<S> exclusive(Node node, TxnId txnId, S keysOrRanges)
{
- return coordinate(node, Kind.SyncPoint, keysOrRanges);
+ return coordinate(node, txnId, keysOrRanges, true);
}
- private static AsyncResult<SyncPoint> coordinate(Node node, Kind kind, Ranges ranges)
+ public static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> inclusive(Node node, S keysOrRanges, boolean async)
{
- TxnId txnId = node.nextTxnId(kind, ranges.domain());
- return node.withEpoch(txnId.epoch(), () -> {
- FullRangeRoute route = (FullRangeRoute) node.computeRoute(txnId, ranges);
- CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, ranges), route, ranges);
- coordinate.start();
- return coordinate;
- }).beginAsResult();
+ return coordinate(node, Kind.SyncPoint, keysOrRanges, async);
}
- public static AsyncResult<SyncPoint> coordinate(Node node, TxnId txnId, Ranges ranges)
+ private static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> coordinate(Node node, Kind kind, S keysOrRanges, boolean async)
{
- Invariants.checkState(txnId.rw() == Kind.SyncPoint || txnId.rw() == ExclusiveSyncPoint);
- FullRangeRoute route = (FullRangeRoute) node.computeRoute(txnId, ranges);
- CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(txnId.rw(), ranges), route, ranges);
+ checkArgument(kind == Kind.SyncPoint || kind == ExclusiveSyncPoint);
+ node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain());
+ TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+ return node.withEpoch(txnId.epoch(), () ->
+ AsyncChains.success(coordinate(node, txnId, keysOrRanges, async))
+ ).beginAsResult();
+ }
+
+ private static <S extends Seekables<?, ?>> CoordinateSyncPoint<S> coordinate(Node node, TxnId txnId, S keysOrRanges, boolean async)
+ {
+ checkArgument(txnId.rw() == Kind.SyncPoint || txnId.rw() == ExclusiveSyncPoint);
+ FullRoute route = node.computeRoute(txnId, keysOrRanges);
+ CoordinateSyncPoint<S> coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(txnId.rw(), keysOrRanges), route, keysOrRanges, async);
coordinate.start();
return coordinate;
}
+ static <S extends Seekables<?, ?>> void blockOnDeps(Node node, Txn txn, TxnId txnId, FullRoute<?> route, S keysOrRanges, Deps deps, BiConsumer<SyncPoint<S>, Throwable> callback, boolean async)
+ {
+ // If deps are empty there is nothing to wait on application for so we can return immediately
+ boolean processAsyncCompletion = deps.isEmpty() || async;
+ BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, (result, throwable) -> {
+ // Don't want to process completion twice
+ if (processAsyncCompletion)
+ {
+ // Don't lose the error
+ if (throwable != null)
+ node.agent().onUncaughtException(throwable);
+ return;
+ }
+ if (throwable != null)
+ callback.accept(null, throwable);
+ else
+ callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, route, false), null);
+ });
+ // Notify immediately and the caller can add a listener to command completion to track local application
+ if (processAsyncCompletion)
+ callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, route, true), null);
+ }
+
@Override
void onNewEpoch(Topologies topologies, Timestamp executeAt, List<PreAcceptOk> successes)
{
@@ -112,16 +151,10 @@
executeAt = txnId;
// we don't need to fetch deps from Accept replies, so we don't need to contact unsynced epochs
topologies = node.topology().forEpoch(route, txnId.epoch());
- new Propose<SyncPoint>(node, topologies, Ballot.ZERO, txnId, txn, route, executeAt, deps, this)
- {
- @Override
- void onAccepted()
- {
- Apply.sendMaximal(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null));
- node.configService().reportEpochClosed(ranges, txnId.epoch());
- accept(new SyncPoint(txnId, deps, ranges, (FullRangeRoute) route), null);
- }
- }.start();
+ if (tracker.hasFastPathAccepted() && txnId.rw() == Kind.SyncPoint)
+ blockOnDeps(node, txn, txnId, route, keysOrRanges, deps, this, async);
+ else
+ ProposeSyncPoint.proposeSyncPoint(node, topologies, Ballot.ZERO, txnId, txn, route, deps, executeAt, this, async, tracker.nodes(), keysOrRanges);
}
}
@@ -129,7 +162,7 @@
{
TxnId txnId = syncPoint.syncId;
Timestamp executeAt = txnId;
- Txn txn = node.agent().emptyTxn(txnId.rw(), syncPoint.ranges);
+ Txn txn = node.agent().emptyTxn(txnId.rw(), syncPoint.keysOrRanges);
Deps deps = syncPoint.waitFor;
Apply.sendMaximal(node, to, txnId, syncPoint.route(), txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null));
}
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index 7a64395..edfd033 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -36,6 +36,7 @@
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
+import accord.primitives.Txn.Kind;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.topology.Topology;
@@ -43,6 +44,7 @@
import static accord.coordinate.ReadCoordinator.Action.Approve;
import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
import static accord.messages.Commit.Kind.Maximal;
+import static accord.utils.Invariants.checkArgument;
class Execute extends ReadCoordinator<ReadReply>
{
@@ -69,7 +71,14 @@
public static void execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback)
{
- if (txn.read().keys().isEmpty())
+ // Recovery calls execute and we would like execute to run BlockOnDeps because that will notify the agent
+ // of the local barrier
+ if (txn.kind() == Kind.SyncPoint)
+ {
+ checkArgument(txnId.equals(executeAt));
+ BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, callback);
+ }
+ else if (txn.read().keys().isEmpty())
{
Result result = txn.result(txnId, executeAt, null);
Persist.persist(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), result);
@@ -105,7 +114,7 @@
{
if (reply.isOk())
{
- ReadOk ok = (ReadOk) reply;
+ ReadOk ok = ((ReadOk) reply);
Data next = ok.data;
if (next != null)
data = data == null ? next : data.merge(next);
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java
index b72408e..0d27f75 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -62,10 +62,10 @@
private final Map<Id, AcceptReply> debug = debug() ? new HashMap<>() : null;
final Timestamp executeAt;
final QuorumTracker acceptTracker;
- final BiConsumer<? super R, Throwable> callback;
+ final BiConsumer<R, Throwable> callback;
private boolean isDone;
- Propose(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super R, Throwable> callback)
+ Propose(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<R, Throwable> callback)
{
this.node = node;
this.ballot = ballot;
diff --git a/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java
new file mode 100644
index 0000000..66db85e
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Apply;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRangeRoute;
+import accord.primitives.FullRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+
+public class ProposeSyncPoint<S extends Seekables<?, ?>> extends Propose<SyncPoint<S>>
+{
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(ProposeSyncPoint.class);
+
+ // Whether to wait on the dependencies applying globally before returning a result
+ final boolean async;
+ final Set<Id> fastPathNodes;
+
+ final S keysOrRanges;
+
+ ProposeSyncPoint(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, Timestamp executeAt, BiConsumer<SyncPoint<S>, Throwable> callback, boolean async, Set<Id> fastPathNodes, S keysOrRanges)
+ {
+ super(node, topologies, ballot, txnId, txn, route, executeAt, deps, callback);
+ this.async = async;
+ this.fastPathNodes = fastPathNodes;
+ this.keysOrRanges = keysOrRanges;
+ }
+
+ public static <S extends Seekables<?, ?>> Propose<SyncPoint<S>> proposeSyncPoint(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, Timestamp executeAt, BiConsumer<SyncPoint<S>, Throwable> callback, boolean async, Set<Id> fastPathNodes, S keysOrRanges)
+ {
+ ProposeSyncPoint proposeSyncPoint = new ProposeSyncPoint(node, topologies, ballot, txnId, txn, route, deps, executeAt, callback, async, fastPathNodes, keysOrRanges);
+ proposeSyncPoint.start();
+ return proposeSyncPoint;
+ }
+
+ @Override
+ void onAccepted()
+ {
+ if (txnId.rw() == ExclusiveSyncPoint)
+ {
+ Apply.sendMaximal(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null));
+ node.configService().reportEpochClosed((Ranges)keysOrRanges, txnId.epoch());
+ callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, (FullRangeRoute) route, true), null);
+ }
+ else
+ {
+ CoordinateSyncPoint.blockOnDeps(node, txn, txnId, route, keysOrRanges, deps, callback, async);
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index aaeeb86..0f03927 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -22,7 +22,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +34,9 @@
import accord.local.Node;
import accord.messages.Callback;
import accord.messages.MessageType;
-import accord.messages.ReadData;
+import accord.messages.ReadData.ReadNack;
+import accord.messages.ReadData.ReadOk;
+import accord.messages.ReadData.ReadReply;
import accord.messages.WaitAndReadData;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
@@ -47,6 +48,7 @@
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
+import javax.annotation.Nullable;
import static accord.messages.ReadData.ReadNack.NotCommitted;
import static accord.primitives.Routables.Slice.Minimal;
@@ -133,10 +135,10 @@
Ranges ownedRanges = ownedRangesForNode(to);
Invariants.checkArgument(ownedRanges.containsAll(ranges), "Got a reply from %s for ranges %s, but owned ranges %s does not contain all the ranges", to, ranges, ownedRanges);
PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
- node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges), collectMaxApplied()), new Callback<ReadData.ReadReply>()
+ node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges), collectMaxApplied()), new Callback<ReadReply>()
{
@Override
- public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+ public void onSuccess(Node.Id from, ReadReply reply)
{
if (!reply.isOk())
{
@@ -148,7 +150,7 @@
{
fail(to, new RuntimeException(reply.toString()));
inflight.remove(key).cancel();
- switch ((ReadData.ReadNack) reply)
+ switch ((ReadNack) reply)
{
default: throw new AssertionError("Unhandled enum: " + reply);
case Invalid:
@@ -285,7 +287,7 @@
}
}
- public static class FetchResponse extends ReadData.ReadOk
+ public static class FetchResponse extends ReadOk
{
public final @Nullable Timestamp maxApplied;
public FetchResponse(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Timestamp maxApplied)
diff --git a/accord-core/src/main/java/accord/impl/CommandTimeseries.java b/accord-core/src/main/java/accord/impl/CommandTimeseries.java
index 4cd34f5..66d26ee 100644
--- a/accord-core/src/main/java/accord/impl/CommandTimeseries.java
+++ b/accord-core/src/main/java/accord/impl/CommandTimeseries.java
@@ -26,6 +26,7 @@
import java.util.stream.Stream;
import javax.annotation.Nullable;
+import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSortedMap;
import accord.api.Key;
@@ -121,6 +122,16 @@
@Nullable Status minStatus, @Nullable Status maxStatus,
SafeCommandStore.CommandFunction<T, T> map, T initialValue, T terminalValue)
{
+ return mapReduceWithTerminate(testKind, testTimestamp, timestamp,
+ testDep, depId,
+ minStatus, maxStatus,
+ map, initialValue, Predicates.equalTo(terminalValue));
+ }
+ public <T> T mapReduceWithTerminate(SafeCommandStore.TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+ SafeCommandStore.TestDep testDep, @Nullable TxnId depId,
+ @Nullable Status minStatus, @Nullable Status maxStatus,
+ SafeCommandStore.CommandFunction<T, T> map, T initialValue, Predicate<T> terminatePredicate)
+ {
for (D data : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values())
{
@@ -136,8 +147,8 @@
if (testDep != ANY_DEPS && (!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != (testDep == WITH))))
continue;
Timestamp executeAt = loader.executeAt(data);
- initialValue = map.apply(keyOrRange, txnId, executeAt, initialValue);
- if (initialValue.equals(terminalValue))
+ initialValue = map.apply(keyOrRange, txnId, executeAt, status.status, initialValue);
+ if (terminatePredicate.test(initialValue))
break;
}
return initialValue;
diff --git a/accord-core/src/main/java/accord/impl/CommandsForKey.java b/accord-core/src/main/java/accord/impl/CommandsForKey.java
index 48cbc7c..ed238af 100644
--- a/accord-core/src/main/java/accord/impl/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/CommandsForKey.java
@@ -20,13 +20,19 @@
import accord.api.Key;
import accord.impl.CommandTimeseries.CommandLoader;
-import accord.local.*;
-import accord.primitives.*;
import com.google.common.collect.ImmutableSortedMap;
-import java.util.*;
+import java.util.Objects;
import java.util.function.Consumer;
+import accord.local.Command;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
import static accord.local.Status.PreAccepted;
import static accord.local.Status.PreCommitted;
diff --git a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java b/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java
index 35b9ea7..e4db999 100644
--- a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java
+++ b/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java
@@ -224,7 +224,7 @@
{
TxnId at = node.nextTxnId(ExclusiveSyncPoint, Domain.Range);
node.scheduler().once(() -> node.withEpoch(at.epoch(), () -> {
- CoordinateSyncPoint.coordinate(node, at, ranges)
+ CoordinateSyncPoint.exclusive(node, at, ranges)
.addCallback((success, fail) -> {
if (fail != null) logger.trace("Exception coordinating exclusive sync point for local shard durability of {}", ranges, fail);
else coordinateShardDurableAfterExclusiveSyncPoint(node, success);
diff --git a/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java b/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java
index 5aea436..e9eea9b 100644
--- a/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java
+++ b/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java
@@ -18,10 +18,9 @@
package accord.impl;
-import java.util.Collection;
-import java.util.Collections;
-
import accord.local.Command;
+import accord.local.Command.TransientListener;
+import accord.local.Listeners;
import accord.local.SafeCommand;
import accord.local.SaveStatus;
import accord.primitives.TxnId;
@@ -70,9 +69,9 @@
}
@Override
- public Collection<Command.TransientListener> transientListeners()
+ public Listeners<TransientListener> transientListeners()
{
- return Collections.emptyList();
+ return Listeners.EMPTY;
}
@Override
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index ff9ebdc..fe47f27 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -20,13 +20,13 @@
import java.util.AbstractMap;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
@@ -38,6 +38,8 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,9 +75,9 @@
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
-import javax.annotation.Nullable;
import static accord.local.Command.NotDefined.uninitialised;
+
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.local.Status.Committed;
@@ -194,7 +196,7 @@
return attrs;
}
- private <O> O mapReduceForKey(InMemorySafeStore safeStore, Routables<?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+ private <O> O mapReduceForKey(InMemorySafeStore safeStore, Routables<?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, Predicate<O> terminate)
{
switch (keysOrRanges.domain()) {
default:
@@ -208,7 +210,7 @@
if (forKey.current() == null)
continue;
accumulate = map.apply(forKey.current(), accumulate);
- if (accumulate.equals(terminalValue))
+ if (terminate.test(accumulate))
return accumulate;
}
break;
@@ -222,7 +224,7 @@
if (forKey.value() == null)
continue;
accumulate = map.apply(forKey.value(), accumulate);
- if (accumulate.equals(terminalValue))
+ if (terminate.test(accumulate))
return accumulate;
}
}
@@ -532,9 +534,9 @@
return super.value(value);
}
- public Collection<Command.TransientListener> transientListeners()
+ public Listeners<Command.TransientListener> transientListeners()
{
- return transientListeners == null ? Collections.emptySet() : transientListeners;
+ return transientListeners == null ? Listeners.EMPTY : transientListeners;
}
}
@@ -553,6 +555,18 @@
}
}
+ private static class TimestampAndStatus
+ {
+ public final Timestamp timestamp;
+ public final Status status;
+
+ public TimestampAndStatus(Timestamp timestamp, Status status)
+ {
+ this.timestamp = timestamp;
+ this.status = status;
+ }
+ }
+
public static class InMemorySafeStore extends AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeCommandsForKey>
{
private final InMemoryCommandStore commandStore;
@@ -640,7 +654,7 @@
@Override
public Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
{
- Timestamp timestamp = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> Timestamp.nonNullOrMax(forKey.max(), prev), Timestamp.NONE, null);
+ Timestamp timestamp = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> Timestamp.nonNullOrMax(forKey.max(), prev), Timestamp.NONE, Objects::isNull);
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
for (RangeCommand command : commandStore.rangeCommands.values())
{
@@ -701,6 +715,12 @@
@Override
public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
{
+ return mapReduceWithTerminate(keysOrRanges, slice, testKind, testTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, accumulate, Predicate.isEqual(terminalValue));
+ }
+
+ @Override
+ public <T> T mapReduceWithTerminate(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, Predicate<T> terminate)
+ {
accumulate = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> {
CommandTimeseries<?> timeseries;
switch (testTimestamp)
@@ -726,15 +746,15 @@
case MAY_EXECUTE_BEFORE:
remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
}
- return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
- }, accumulate, terminalValue);
+ return timeseries.mapReduceWithTerminate(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminate);
+ }, accumulate, terminate);
- if (accumulate.equals(terminalValue))
+ if (terminate.test(accumulate))
return accumulate;
// TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
- Map<Range, List<Map.Entry<TxnId, Timestamp>>> collect = new TreeMap<>(Range::compare);
+ Map<Range, List<Map.Entry<TxnId, TimestampAndStatus>>> collect = new TreeMap<>(Range::compare);
commandStore.rangeCommands.forEach(((txnId, rangeCommand) -> {
Command command = rangeCommand.command.value();
// TODO (now): probably this isn't safe - want to ensure we take dependency on any relevant syncId
@@ -783,9 +803,9 @@
Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
- List<Map.Entry<TxnId, Timestamp>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
+ List<Map.Entry<TxnId, TimestampAndStatus>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(command.txnId()))
- list.add(new AbstractMap.SimpleImmutableEntry<>(command.txnId(), command.executeAt()));
+ list.add(new AbstractMap.SimpleImmutableEntry<>(command.txnId(), new TimestampAndStatus(command.executeAt(), command.status())));
return in;
}, collect);
}));
@@ -814,20 +834,20 @@
Routables.foldl(ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
- List<Map.Entry<TxnId, Timestamp>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
+ List<Map.Entry<TxnId, TimestampAndStatus>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(txnId))
- list.add(new AbstractMap.SimpleImmutableEntry<>(txnId, txnId));
+ list.add(new AbstractMap.SimpleImmutableEntry<>(txnId, new TimestampAndStatus(txnId, Status.NotDefined)));
return in;
}, collect);
}));
}
- for (Map.Entry<Range, List<Map.Entry<TxnId, Timestamp>>> e : collect.entrySet())
+ for (Map.Entry<Range, List<Map.Entry<TxnId, TimestampAndStatus>>> e : collect.entrySet())
{
- for (Map.Entry<TxnId, Timestamp> command : e.getValue())
+ for (Map.Entry<TxnId, TimestampAndStatus> command : e.getValue())
{
T initial = accumulate;
- accumulate = map.apply(e.getKey(), command.getKey(), command.getValue(), initial);
+ accumulate = map.apply(e.getKey(), command.getKey(), command.getValue().timestamp, command.getValue().status, initial);
}
}
diff --git a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
index e8de7d8..9c4c2bd 100644
--- a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
+++ b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
@@ -18,11 +18,11 @@
package accord.impl;
-import java.util.Collection;
import java.util.function.Supplier;
import accord.impl.InMemoryCommandStore.GlobalCommand;
import accord.local.Command;
+import accord.local.Listeners;
import accord.local.SafeCommand;
import accord.primitives.TxnId;
@@ -72,7 +72,7 @@
}
@Override
- public Collection<Command.TransientListener> transientListeners()
+ public Listeners<Command.TransientListener> transientListeners()
{
return global.transientListeners();
}
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index 420d54d..ae39592 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -25,6 +25,10 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import accord.api.ProgressLog;
import accord.coordinate.FetchData;
@@ -37,6 +41,7 @@
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
+
import accord.local.SaveStatus.LocalExecution;
import accord.local.Status;
import accord.local.Status.Known;
@@ -51,7 +56,6 @@
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncResult;
-import javax.annotation.Nullable;
import static accord.api.ProgressLog.ProgressShard.Unsure;
import static accord.coordinate.InformHomeOfTxn.inform;
@@ -74,8 +78,12 @@
// TODO (expected): report long-lived recurring transactions / operations
public class SimpleProgressLog implements ProgressLog.Factory
{
+ private static final Logger logger = LoggerFactory.getLogger(SimpleProgressLog.class);
+
enum Progress { NoneExpected, Expected, NoProgress, Investigating, Done }
+ public static volatile boolean PAUSE_FOR_TEST = false;
+
enum CoordinateStatus
{
NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done;
@@ -690,6 +698,12 @@
isScheduled = false;
try
{
+ if (PAUSE_FOR_TEST)
+ {
+ logger.info("Skipping progress log because it is paused for test");
+ return;
+ }
+
for (State.Monitoring run : this)
{
if (run.shouldRun())
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java
index 638e5ea..41ce28e 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -23,13 +23,11 @@
import java.util.Set;
import java.util.function.BiConsumer;
-import javax.annotation.Nullable;
-
import accord.api.Agent;
import accord.api.DataStore;
+import accord.api.DataStore.FetchRanges;
import accord.api.DataStore.FetchResult;
import accord.api.DataStore.StartingRangeFetch;
-import accord.api.DataStore.FetchRanges;
import accord.coordinate.CoordinateNoOp;
import accord.coordinate.CoordinateSyncPoint;
import accord.primitives.Ranges;
@@ -40,6 +38,7 @@
import accord.utils.Invariants;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
+import javax.annotation.Nullable;
import static accord.local.PreLoadContext.contextFor;
import static accord.local.PreLoadContext.empty;
@@ -133,7 +132,7 @@
// of these ranges as part of this attempt
Ranges commitRanges = valid;
store.markBootstrapping(safeStore0, globalSyncId, valid);
- CoordinateSyncPoint.coordinate(node, globalSyncId, commitRanges)
+ CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
// TODO (correcness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges!
// ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed
.flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys()), safeStore1 -> {
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index 18d0d60..49ebb5c 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -363,7 +363,7 @@
@Override
public abstract Status.Durability durability();
@Override
- public abstract Listeners.Immutable durableListeners();
+ public abstract Listeners.Immutable<DurableAndIdempotentListener> durableListeners();
public abstract SaveStatus saveStatus();
static boolean isSameClass(Command command, Class<? extends Command> klass)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java
index 9a0f846..ae45381 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -40,7 +40,6 @@
import accord.api.ProgressLog;
import accord.coordinate.CollectDeps;
import accord.local.Command.WaitingOn;
-import accord.local.CommandStores.RangesForEpoch;
import accord.primitives.FullRoute;
import accord.primitives.KeyDeps;
import accord.primitives.Participants;
@@ -57,6 +56,7 @@
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
+import accord.local.CommandStores.RangesForEpoch;
import static accord.api.ConfigurationService.EpochReady.DONE;
import static accord.local.PreLoadContext.contextFor;
@@ -205,7 +205,7 @@
SafeCommandStore.TestTimestamp.STARTED_AFTER, Timestamp.NONE,
SafeCommandStore.TestDep.ANY_DEPS, null,
Status.Applied, Status.Applied,
- (key, txnId, executeAt, max) -> Timestamp.max(max, executeAt),
+ (key, txnId, executeAt, status, max) -> Timestamp.max(max, executeAt),
Timestamp.NONE, Timestamp.MAX);
}
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java
index 357f8cf..a1308b0 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -56,6 +56,9 @@
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2ObjectHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static accord.api.ConfigurationService.EpochReady.done;
import static accord.local.PreLoadContext.empty;
import static accord.primitives.Routables.Slice.Minimal;
@@ -67,6 +70,9 @@
*/
public abstract class CommandStores
{
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(CommandStores.class);
+
public interface Factory
{
CommandStores create(NodeTimeService time,
@@ -215,6 +221,11 @@
return allInternal(0, ceilIndex(toExclusive));
}
+ public @Nonnull Ranges allAfter(long fromInclusive)
+ {
+ return allInternal(floorIndex(fromInclusive), ranges.length);
+ }
+
public @Nonnull Ranges allUntil(long toInclusive)
{
return allInternal(0, 1 + floorIndex(toInclusive));
@@ -456,6 +467,12 @@
return null;
}
+
+ @Override
+ public String toString()
+ {
+ return forEach.getClass().getName();
+ }
});
}
diff --git a/accord-core/src/main/java/accord/local/Listeners.java b/accord-core/src/main/java/accord/local/Listeners.java
index 84849af..4afd6b4 100644
--- a/accord-core/src/main/java/accord/local/Listeners.java
+++ b/accord-core/src/main/java/accord/local/Listeners.java
@@ -25,6 +25,8 @@
public class Listeners<L extends Command.Listener> extends DeterministicSet<L>
{
+ public static Listeners EMPTY = new Listeners<>();
+
public Listeners()
{
}
@@ -34,7 +36,7 @@
super(copy);
}
- public static class Immutable extends Listeners<Command.DurableAndIdempotentListener>
+ public static class Immutable<L extends Command.Listener> extends Listeners<L>
{
public static final Immutable EMPTY = new Immutable();
@@ -43,18 +45,18 @@
super();
}
- public Immutable(Listeners<Command.DurableAndIdempotentListener> listeners)
+ public Immutable(Listeners<L> listeners)
{
super(listeners);
}
- Listeners<Command.DurableAndIdempotentListener> mutable()
+ Listeners<L> mutable()
{
return new Listeners<>(this);
}
@Override
- public boolean add(Command.DurableAndIdempotentListener item)
+ public boolean add(L item)
{
throw new UnsupportedOperationException("Cannot modify immutable set");
}
@@ -72,7 +74,7 @@
}
@Override
- public boolean addAll(Collection<? extends Command.DurableAndIdempotentListener> c)
+ public boolean addAll(Collection<? extends L> c)
{
throw new UnsupportedOperationException("Cannot modify immutable set");
}
@@ -84,7 +86,7 @@
}
@Override
- public boolean removeIf(Predicate<? super Command.DurableAndIdempotentListener> filter)
+ public boolean removeIf(Predicate<? super L> filter)
{
throw new UnsupportedOperationException("Cannot modify immutable set");
}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index f0ee2bd..6492ddf 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -40,6 +40,8 @@
import accord.api.Agent;
import accord.api.ConfigurationService;
+
+import accord.api.BarrierType;
import accord.api.ConfigurationService.EpochReady;
import accord.api.DataStore;
import accord.api.Key;
@@ -66,6 +68,7 @@
import accord.primitives.ProgressToken;
import accord.primitives.Range;
import accord.primitives.Ranges;
+import accord.coordinate.Barrier;
import accord.primitives.Routable.Domain;
import accord.primitives.Routables;
import accord.primitives.Route;
@@ -524,6 +527,26 @@
return new TxnId(uniqueNow(), rw, domain);
}
+ /**
+ * Trigger one of several different kinds of barrier transactions on a key or range with different properties. Barriers ensure that all prior transactions
+ * have their side effects visible up to some point.
+ *
+ * Local barriers will look for a local transaction that was applied in minEpoch or later and returns when one exists or completes.
+ * It may, but it is not guaranteed to, trigger a global barrier transaction that effects the barrier at all replicas.
+ *
+ * A global barrier is guaranteed to create a distributed barrier transaction, and if it is synchronous will not return until the
+ * transaction has applied at a quorum globally (meaning all dependencies and their side effects are already visible). If it is asynchronous
+ * it will return once the barrier has been applied locally.
+ *
+ * Ranges are only supported for global barriers.
+ *
+ * Returns the Timestamp the barrier actually ended up occurring at. Keep in mind for local barriers it doesn't mean a new transaction was created.
+ */
+ public AsyncResult<Timestamp> barrier(Seekables keysOrRanges, long minEpoch, BarrierType barrierType)
+ {
+ return Barrier.barrier(this, keysOrRanges, minEpoch, barrierType);
+ }
+
public AsyncResult<Result> coordinate(Txn txn)
{
return coordinate(nextTxnId(txn.kind(), txn.keys().domain()), txn);
@@ -652,7 +675,12 @@
return future;
}
- public void receive(Request request, Id from, ReplyContext replyContext)
+ public void receive (Request request, Id from, ReplyContext replyContext)
+ {
+ receive(request, from, replyContext, 0);
+ }
+
+ public void receive (Request request, Id from, ReplyContext replyContext, long delayNanos)
{
long knownEpoch = request.knownEpoch();
if (knownEpoch > topology.epoch())
@@ -665,7 +693,7 @@
return;
}
}
- scheduler.now(() -> {
+ Runnable processMsg = () -> {
try
{
request.process(this, from, replyContext);
@@ -674,7 +702,11 @@
{
reply(from, replyContext, null, t);
}
- });
+ };
+ if (delayNanos > 0)
+ scheduler.once(processMsg, delayNanos, TimeUnit.NANOSECONDS);
+ else
+ scheduler.now(processMsg);
}
public Scheduler scheduler()
diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java b/accord-core/src/main/java/accord/local/PreLoadContext.java
index 0ef8f55..cacf81a 100644
--- a/accord-core/src/main/java/accord/local/PreLoadContext.java
+++ b/accord-core/src/main/java/accord/local/PreLoadContext.java
@@ -185,6 +185,8 @@
static PreLoadContext empty()
{
- return contextFor(null, Collections.emptyList(), Keys.EMPTY);
+ return EMPTY_PRELOADCONTEXT;
}
+
+ PreLoadContext EMPTY_PRELOADCONTEXT = contextFor(null, Collections.emptyList(), Keys.EMPTY);
}
diff --git a/accord-core/src/main/java/accord/local/SafeCommand.java b/accord-core/src/main/java/accord/local/SafeCommand.java
index 38de775..a02e294 100644
--- a/accord-core/src/main/java/accord/local/SafeCommand.java
+++ b/accord-core/src/main/java/accord/local/SafeCommand.java
@@ -18,9 +18,8 @@
package accord.local;
-import java.util.Collection;
-
import accord.api.Result;
+import accord.local.Command.TransientListener;
import accord.local.Command.Truncated;
import accord.primitives.Ballot;
import accord.primitives.Timestamp;
@@ -42,7 +41,7 @@
public abstract boolean invalidated();
public abstract void addListener(Command.TransientListener listener);
public abstract boolean removeListener(Command.TransientListener listener);
- public abstract Collection<Command.TransientListener> transientListeners();
+ public abstract Listeners<Command.TransientListener> transientListeners();
public boolean isEmpty()
{
@@ -69,6 +68,12 @@
return update(Command.addListener(current(), listener));
}
+ public void addAndInvokeListener(SafeCommandStore safeStore, TransientListener listener)
+ {
+ addListener(listener);
+ listener.onChange(safeStore, this);
+ }
+
public Command removeListener(Command.DurableAndIdempotentListener listener)
{
Command current = current();
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 559f859..eb9574e 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -18,7 +18,7 @@
package accord.local;
-import java.util.Collection;
+import java.util.Iterator;
import java.util.function.Predicate;
import accord.api.Agent;
@@ -36,7 +36,6 @@
import accord.primitives.Txn.Kind;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
-
import javax.annotation.Nullable;
import static accord.local.Commands.Cleanup.NO;
@@ -55,7 +54,7 @@
{
public interface CommandFunction<I, O>
{
- O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, I in);
+ O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, Status status, I in);
}
public enum TestTimestamp
@@ -213,10 +212,15 @@
* Visits keys first and then ranges, both in ascending order.
* Within each key or range visits TxnId in ascending order of queried timestamp.
*/
- public abstract <T> T mapReduce(Seekables<?, ?> keys, Ranges slice,
+ public abstract <T> T mapReduceWithTerminate(Seekables<?, ?> keysOrRanges, Ranges slice,
+ TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+ TestDep testDep, @Nullable TxnId depId,
+ @Nullable Status minStatus, @Nullable Status maxStatus,
+ CommandFunction<T, T> map, T accumulate, Predicate<T> terminate);
+ public abstract <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice,
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus,
- CommandFunction<T, T> map, T initialValue, T terminalValue);
+ CommandFunction<T, T> map, T accumulate, T terminalValue);
protected abstract void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command);
protected abstract void register(Seekable keyOrRange, Ranges slice, Command command);
@@ -254,13 +258,21 @@
notifyListeners(safeCommand, command, command.durableListeners(), safeCommand.transientListeners());
}
- public void notifyListeners(SafeCommand safeCommand, Command command, Listeners<Command.DurableAndIdempotentListener> durableListeners, Collection<Command.TransientListener> transientListeners)
+ public void notifyListeners(SafeCommand safeCommand, Command command, Listeners<Command.DurableAndIdempotentListener> durableListeners, Listeners<Command.TransientListener> transientListeners)
{
- for (Command.DurableAndIdempotentListener listener : durableListeners)
+ Iterator<Command.DurableAndIdempotentListener> durableIterator = durableListeners.reverseIterator();
+ while (durableIterator.hasNext())
+ {
+ Command.DurableAndIdempotentListener listener = durableIterator.next();
notifyListener(this, safeCommand, command, listener);
+ }
- for (Command.TransientListener listener : transientListeners)
+ Iterator<Command.TransientListener> transientIterator = transientListeners.reverseIterator();
+ while (transientIterator.hasNext())
+ {
+ Command.TransientListener listener = transientIterator.next();
notifyListener(this, safeCommand, command, listener);
+ }
}
public static void notifyListener(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, Command.TransientListener listener)
diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java
index 2e1d201..3947bff 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -21,6 +21,7 @@
import javax.annotation.Nullable;
import accord.api.Result;
+import accord.api.RoutingKey;
import accord.local.Commands;
import accord.local.Node;
import accord.local.Node.Id;
@@ -130,9 +131,15 @@
node.mapReduceConsumeLocal(this, txnId.epoch(), executeAt.epoch(), this);
}
+
@Override
public ApplyReply apply(SafeCommandStore safeStore)
{
+ return apply(safeStore, txn, txnId, executeAt, deps, scope, writes, result, progressKey);
+ }
+
+ public static ApplyReply apply(SafeCommandStore safeStore, PartialTxn txn, TxnId txnId, Timestamp executeAt, PartialDeps deps, PartialRoute<?> scope, Writes writes, Result result, RoutingKey progressKey)
+ {
SafeCommand safeCommand = safeStore.get(txnId, executeAt, scope);
switch (Commands.apply(safeStore, safeCommand, txnId, scope, progressKey, executeAt, deps, txn, writes, result))
{
diff --git a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
new file mode 100644
index 0000000..79a5b35
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.messages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.local.CommandStore;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
+import accord.messages.Apply.ApplyReply;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+
+/*
+ * Used by local and global inclusive sync points to effect the sync point at each node
+ * Combines commit, execute (with nothing really to execute), and apply into one request/response
+ */
+public class ApplyThenWaitUntilApplied extends WaitUntilApplied
+{
+ private static final Logger logger = LoggerFactory.getLogger(ReadData.class);
+
+ @SuppressWarnings("unused")
+ public static class SerializerSupport
+ {
+ public static ApplyThenWaitUntilApplied create(TxnId txnId, PartialRoute<?> route, PartialDeps deps, Seekables<?, ?> partialTxnKeys, Writes writes, Result result, boolean notifyAgent)
+ {
+ return new ApplyThenWaitUntilApplied(txnId, route, deps, partialTxnKeys, writes, result, notifyAgent);
+ }
+ }
+
+ public final PartialRoute<?> route;
+ public final PartialDeps deps;
+ public final Writes writes;
+ public final Result txnResult;
+ public final boolean notifyAgent;
+ public final Seekables<?, ?> partialTxnKeys;
+
+ ApplyThenWaitUntilApplied(TxnId txnId, PartialRoute<?> route, PartialDeps deps, Seekables<?, ?> partialTxnKeys, Writes writes, Result txnResult, boolean notifyAgent)
+ {
+ super(txnId, partialTxnKeys.toParticipants(), txnId, txnId.epoch());
+ this.route = route;
+ this.deps = deps;
+ this.writes = writes;
+ this.txnResult = txnResult;
+ this.notifyAgent = notifyAgent;
+ this.partialTxnKeys = partialTxnKeys;
+ }
+
+ @Override
+ public ReadType kind()
+ {
+ return ReadType.applyThenWaitUntilApplied;
+ }
+
+ @Override
+ public ReadNack apply(SafeCommandStore safeStore)
+ {
+ RoutingKey progressKey = TxnRequest.progressKey(node, txnId.epoch(), txnId, route);
+ ApplyReply applyReply = Apply.apply(safeStore, null, txnId, txnId, deps, route, writes, txnResult, progressKey);
+ switch (applyReply)
+ {
+ default:
+ throw new IllegalStateException("Unexpected ApplyReply");
+ case Insufficient:
+ throw new IllegalStateException("ApplyThenWaitUntilApplied is always sent with a maximal `Commit` so how can `Apply` have an `Insufficient` result");
+ case Redundant:
+ case Applied:
+ // In both cases it's fine to continue to process and return a response saying
+ // things were applied
+ break;
+ }
+ return super.apply(safeStore);
+ }
+
+ @Override
+ protected void readComplete(CommandStore commandStore, Data readResult, Ranges unavailable)
+ {
+ logger.trace("{}: readComplete ApplyThenWaitUntilApplied", txnId);
+ commandStore.execute(PreLoadContext.contextFor(txnId), safeStore -> {
+ super.readComplete(commandStore, readResult, unavailable);
+ }).begin(node.agent());
+ }
+
+ @Override
+ protected void onAllReadsComplete()
+ {
+ if (notifyAgent)
+ node.agent().onLocalBarrier(partialTxnKeys, txnId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "WaitForDependenciesThenApply{" +
+ "txnId:" + txnId +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index 1efd652..e23a764 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -341,7 +341,7 @@
{
// any transaction that started
commandStore.mapReduce(keys, ranges, shouldHaveWitnessed(startedBefore.rw()), STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,
- (keyOrRange, txnId, executeAt, prev) -> {
+ (keyOrRange, txnId, executeAt, saveStatus, prev) -> {
if (executeAt.compareTo(startedBefore) > 0)
builder.add(keyOrRange, txnId);
return builder;
@@ -355,7 +355,7 @@
try (Deps.Builder builder = Deps.builder())
{
commandStore.mapReduce(keys, ranges, shouldHaveWitnessed(startedBefore.rw()), STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null,
- (keyOrRange, txnId, executeAt, prev) -> builder.add(keyOrRange, txnId), (Deps.AbstractBuilder<Deps>)builder, null);
+ (keyOrRange, txnId, executeAt, saveStatus, prev) -> builder.add(keyOrRange, txnId), builder, (Deps.AbstractBuilder<Deps>)null);
return builder.build();
}
}
@@ -370,7 +370,7 @@
* has not witnessed us we can safely invalidate (us).
*/
return commandStore.mapReduce(keys, ranges, shouldHaveWitnessed(startedAfter.rw()), STARTED_AFTER, startedAfter, WITHOUT, startedAfter, Accepted, PreCommitted,
- (keyOrRange, txnId, executeAt, prev) -> true, false, true);
+ (keyOrRange, txnId, executeAt, saveStatus, prev) -> true, false, true);
}
private static boolean hasCommittedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
@@ -383,6 +383,6 @@
* has not witnessed us we can safely invalidate it.
*/
return commandStore.mapReduce(keys, ranges, shouldHaveWitnessed(startedAfter.rw()), EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null,
- (keyOrRange, txnId, executeAt, prev) -> true,false, true);
+ (keyOrRange, txnId, executeAt, saveStatus, prev) -> true,false, true);
}
}
diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java
index 97e67cc..2e30e05 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -18,7 +18,6 @@
package accord.messages;
import java.util.Set;
-
import javax.annotation.Nullable;
import org.slf4j.Logger;
@@ -49,6 +48,9 @@
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.utils.Invariants;
+import accord.utils.TriFunction;
+
+import static accord.utils.Invariants.checkArgument;
public class Commit extends TxnRequest<ReadNack>
{
@@ -56,9 +58,9 @@
public static class SerializerSupport
{
- public static Commit create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Kind kind, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadTxnData read)
+ public static Commit create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Kind kind, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
{
- return new Commit(kind, txnId, scope, waitForEpoch, executeAt, partialTxn, partialDeps, fullRoute, read);
+ return new Commit(kind, txnId, scope, waitForEpoch, executeAt, partialTxn, partialDeps, fullRoute, readData);
}
}
@@ -67,14 +69,25 @@
public final @Nullable PartialTxn partialTxn;
public final PartialDeps partialDeps;
public final @Nullable FullRoute<?> route;
- public final ReadTxnData read;
+ public final ReadData readData;
public enum Kind { Minimal, Maximal }
// TODO (low priority, clarity): cleanup passing of topologies here - maybe fetch them afresh from Node?
// Or perhaps introduce well-named classes to represent different topology combinations
+
+ public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, ReadTxnData read)
+ {
+ this(kind, to, coordinateTopology, topologies, txnId, txn, route, executeAt, deps, (u1, u2, u3) -> read);
+ }
+
public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, @Nullable Participants<?> readScope, Timestamp executeAt, Deps deps, boolean read)
{
+ this(kind, to, coordinateTopology, topologies, txnId, txn, route, executeAt, deps, read ? new ReadTxnData(to, topologies, txnId, readScope, executeAt) : null);
+ }
+
+ public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, TriFunction<Txn, PartialRoute<?>, PartialDeps, ReadData> toExecuteFactory)
+ {
super(to, topologies, route, txnId);
FullRoute<?> sendRoute = null;
@@ -100,10 +113,10 @@
this.partialTxn = partialTxn;
this.partialDeps = deps.slice(scope.covering());
this.route = sendRoute;
- this.read = read ? new ReadTxnData(to, topologies, txnId, readScope, executeAt) : null;
+ this.readData = toExecuteFactory.apply(partialTxn != null ? partialTxn : txn, scope, partialDeps);
}
- Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadTxnData read)
+ Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
{
super(txnId, scope, waitForEpoch);
this.kind = kind;
@@ -111,7 +124,7 @@
this.partialTxn = partialTxn;
this.partialDeps = partialDeps;
this.route = fullRoute;
- this.read = read;
+ this.readData = readData;
}
// TODO (low priority, clarity): accept Topology not Topologies
@@ -141,6 +154,25 @@
}
}
+ public static void commitMaximalAndBlockOnDeps(Node node, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, Callback<ReadReply> callback)
+ {
+ checkArgument(topologies.size() == 1);
+ Topology topology = topologies.get(0);
+ for (Node.Id to : topology.nodes())
+ {
+ // To simplify making sure the agent is notified once and is notified before the barrier coordination
+ // returns a result; we never notify the agent on the coordinator as part of WaitForDependenciesThenApply execution
+ // Also always send a maximal commit since we don't block on deps often and that saves having to have an Insufficient code path
+ // going back for the Apply in `ApplyThenWaitUntilApplied
+ boolean notifyAgent = !to.equals(node.id());
+ Commit commit = new Commit(
+ Kind.Maximal, to, topology, topologies, txnId,
+ txn, route, txnId, deps,
+ (maybePartialTransaction, partialRoute, partialDeps) -> new ApplyThenWaitUntilApplied(txnId, partialRoute, partialDeps, maybePartialTransaction.keys().slice(partialDeps.covering), txn.execute(txnId, txnId, null), txn.result(txnId, txnId, null), notifyAgent));
+ node.send(to, commit, callback);
+ }
+ }
+
@Override
public TxnId primaryTxnId()
{
@@ -188,8 +220,8 @@
{
if (reply != null || failure != null)
node.reply(replyTo, replyContext, reply, failure);
- else if (read != null)
- read.process(node, replyTo, replyContext);
+ else if (readData != null)
+ readData.process(node, replyTo, replyContext);
}
@Override
@@ -210,7 +242,7 @@
", txnId: " + txnId +
", executeAt: " + executeAt +
", deps: " + partialDeps +
- ", read: " + read +
+ ", toExecute: " + readData +
'}';
}
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java
index aab5e0c..42f1ad4 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -17,54 +17,55 @@
*/
package accord.messages;
-import static accord.messages.MessageType.Kind.REMOTE;
import static accord.messages.MessageType.Kind.LOCAL;
+import static accord.messages.MessageType.Kind.REMOTE;
/**
* Meant to assist implementations with mapping accord messages to their own messaging systems.
*/
public enum MessageType
{
- SIMPLE_RSP (REMOTE, false),
- FAILURE_RSP (REMOTE, false),
- PRE_ACCEPT_REQ (REMOTE, true ),
- PRE_ACCEPT_RSP (REMOTE, false),
- ACCEPT_REQ (REMOTE, true ),
- ACCEPT_RSP (REMOTE, false),
- ACCEPT_INVALIDATE_REQ (REMOTE, true ),
- GET_DEPS_REQ (REMOTE, false),
- GET_DEPS_RSP (REMOTE, false),
- COMMIT_MINIMAL_REQ (REMOTE, true ),
- COMMIT_MAXIMAL_REQ (REMOTE, true ),
- COMMIT_INVALIDATE_REQ (REMOTE, true ),
- APPLY_MINIMAL_REQ (REMOTE, true ),
- APPLY_MAXIMAL_REQ (REMOTE, true ),
- APPLY_RSP (REMOTE, false),
- READ_REQ (REMOTE, false),
- READ_RSP (REMOTE, false),
- BEGIN_RECOVER_REQ (REMOTE, true ),
- BEGIN_RECOVER_RSP (REMOTE, false),
- BEGIN_INVALIDATE_REQ (REMOTE, true ),
- BEGIN_INVALIDATE_RSP (REMOTE, false),
- WAIT_ON_COMMIT_REQ (REMOTE, false),
- WAIT_ON_COMMIT_RSP (REMOTE, false),
- WAIT_ON_APPLY_REQ (REMOTE, false),
- INFORM_OF_TXN_REQ (REMOTE, true ),
- INFORM_DURABLE_REQ (REMOTE, true ),
- INFORM_HOME_DURABLE_REQ (REMOTE, true ),
- CHECK_STATUS_REQ (REMOTE, false),
- CHECK_STATUS_RSP (REMOTE, false),
- FETCH_DATA_REQ (REMOTE, false),
- FETCH_DATA_RSP (REMOTE, false),
- SET_SHARD_DURABLE_REQ (REMOTE, true ),
- SET_GLOBALLY_DURABLE_REQ (REMOTE, true ),
- QUERY_DURABLE_BEFORE_REQ (REMOTE, false),
- QUERY_DURABLE_BEFORE_RSP (REMOTE, false),
+ SIMPLE_RSP (REMOTE, false),
+ FAILURE_RSP (REMOTE, false),
+ PRE_ACCEPT_REQ (REMOTE, true ),
+ PRE_ACCEPT_RSP (REMOTE, false),
+ ACCEPT_REQ (REMOTE, true ),
+ ACCEPT_RSP (REMOTE, false),
+ ACCEPT_INVALIDATE_REQ (REMOTE, true ),
+ GET_DEPS_REQ (REMOTE, false),
+ GET_DEPS_RSP (REMOTE, false),
+ COMMIT_MINIMAL_REQ (REMOTE, true ),
+ COMMIT_MAXIMAL_REQ (REMOTE, true ),
+ COMMIT_INVALIDATE_REQ (REMOTE, true ),
+ APPLY_MINIMAL_REQ (REMOTE, true ),
+ APPLY_MAXIMAL_REQ (REMOTE, true ),
+ APPLY_RSP (REMOTE, false),
+ READ_REQ (REMOTE, false),
+ READ_RSP (REMOTE, false),
+ BEGIN_RECOVER_REQ (REMOTE, true ),
+ BEGIN_RECOVER_RSP (REMOTE, false),
+ BEGIN_INVALIDATE_REQ (REMOTE, true ),
+ BEGIN_INVALIDATE_RSP (REMOTE, false),
+ WAIT_ON_COMMIT_REQ (REMOTE, false),
+ WAIT_ON_COMMIT_RSP (REMOTE, false),
+ WAIT_UNTIL_APPLIED_REQ (REMOTE, false),
+ INFORM_OF_TXN_REQ (REMOTE, true ),
+ INFORM_DURABLE_REQ (REMOTE, true ),
+ INFORM_HOME_DURABLE_REQ (REMOTE, true ),
+ CHECK_STATUS_REQ (REMOTE, false),
+ CHECK_STATUS_RSP (REMOTE, false),
+ FETCH_DATA_REQ (REMOTE, false),
+ FETCH_DATA_RSP (REMOTE, false),
+ SET_SHARD_DURABLE_REQ (REMOTE, true ),
+ SET_GLOBALLY_DURABLE_REQ (REMOTE, true ),
+ QUERY_DURABLE_BEFORE_REQ (REMOTE, false),
+ QUERY_DURABLE_BEFORE_RSP (REMOTE, false),
+ APPLY_AND_WAIT_UNTIL_APPLIED_REQ (REMOTE, true ),
- PROPAGATE_PRE_ACCEPT_MSG (LOCAL, true ),
- PROPAGATE_COMMIT_MSG (LOCAL, true ),
- PROPAGATE_APPLY_MSG (LOCAL, true ),
- PROPAGATE_OTHER_MSG (LOCAL, true ),
+ PROPAGATE_PRE_ACCEPT_MSG (LOCAL, true ),
+ PROPAGATE_COMMIT_MSG (LOCAL, true ),
+ PROPAGATE_APPLY_MSG (LOCAL, true ),
+ PROPAGATE_OTHER_MSG (LOCAL, true ),
;
public enum Kind { LOCAL, REMOTE }
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
index 1be51ca..ce72ceb 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -22,6 +22,9 @@
import java.util.Objects;
import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import accord.local.Command;
import accord.local.Commands;
import accord.local.Node.Id;
@@ -49,6 +52,9 @@
public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply> implements EpochSupplier
{
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(PreAccept.class);
+
public static class SerializerSupport
{
public static PreAccept create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, long maxEpoch, PartialTxn partialTxn, @Nullable FullRoute<?> fullRoute)
@@ -257,7 +263,7 @@
// This is necessary for reporting to a bootstrapping replica which TxnId it must not prune from dependencies
// i.e. the source replica reports to the target replica those TxnId that STARTED_BEFORE and EXECUTES_AFTER.
commandStore.mapReduce(keys, ranges, testKind, STARTED_BEFORE, executeAt, ANY_DEPS, null, null, null,
- (keyOrRange, testTxnId, testExecuteAt, in) -> {
+ (keyOrRange, testTxnId, testExecuteAt, saveStatus, in) -> {
// TODO (easy, efficiency): either pass txnId as parameter or encode this behaviour in a specialised builder to avoid extra allocations
if (!testTxnId.equals(txnId))
in.add(keyOrRange, testTxnId);
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 66b5234..3eacce6 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -28,6 +28,7 @@
import accord.local.CommandStore;
import accord.local.Node;
import accord.local.SafeCommandStore;
+import accord.messages.ReadData.ReadNack;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Ranges;
@@ -41,10 +42,40 @@
import static accord.messages.TxnRequest.latestRelevantEpochIndex;
// TODO (required, efficiency): dedup - can currently have infinite pending reads that will be executed independently
-public abstract class ReadData extends AbstractEpochRequest<ReadData.ReadNack>
+public abstract class ReadData extends AbstractEpochRequest<ReadNack>
{
private static final Logger logger = LoggerFactory.getLogger(ReadData.class);
+ public enum ReadType
+ {
+ readTxnData(0),
+ waitUntilApplied(1),
+ applyThenWaitUntilApplied(2);
+
+ public final byte val;
+
+ ReadType(int val)
+ {
+ this.val = (byte)val;
+ }
+
+ @SuppressWarnings("unused")
+ public static ReadType fromValue(byte val)
+ {
+ switch (val)
+ {
+ case 0:
+ return readTxnData;
+ case 1:
+ return waitUntilApplied;
+ case 2:
+ return applyThenWaitUntilApplied;
+ default:
+ throw new IllegalArgumentException("Unrecognized ReadType value " + val);
+ }
+ }
+ }
+
// TODO (expected, cleanup): should this be a Route?
public final Participants<?> readScope;
private final long waitForEpoch;
@@ -68,10 +99,14 @@
this.waitForEpoch = waitForEpoch;
}
+ abstract public ReadType kind();
+
protected abstract void cancel();
protected abstract long executeAtEpoch();
protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail);
+ protected void onAllReadsComplete() {}
+
@Override
public long waitForEpoch()
{
@@ -128,7 +163,10 @@
// and prevents races where we respond before dispatching all the required reads (if the reads are
// completing faster than the reads can be setup on all required shards)
if (-1 == --waitingOnCount)
+ {
+ onAllReadsComplete();
reply(this.unavailable, data, null);
+ }
}
protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable)
@@ -236,4 +274,4 @@
return true;
}
}
-}
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java b/accord-core/src/main/java/accord/messages/ReadTxnData.java
index 4e31910..5dec9ba 100644
--- a/accord-core/src/main/java/accord/messages/ReadTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadTxnData.java
@@ -18,8 +18,6 @@
package accord.messages;
-import javax.annotation.Nullable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +35,7 @@
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.topology.Topologies;
+import javax.annotation.Nullable;
import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
import static accord.local.Status.Committed;
@@ -99,6 +98,12 @@
}
@Override
+ public ReadType kind()
+ {
+ return ReadType.readTxnData;
+ }
+
+ @Override
protected long executeAtEpoch()
{
return executeAtEpoch;
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index 5a37fa9..f383e60 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -20,6 +20,7 @@
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
+import accord.primitives.Ranges;
import accord.primitives.SyncPoint;
import accord.utils.MapReduceConsume;
@@ -39,13 +40,13 @@
@Override
public void process()
{
- node.mapReduceConsumeLocal(this, exclusiveSyncPoint.ranges, waitForEpoch(), waitForEpoch(), this);
+ node.mapReduceConsumeLocal(this, exclusiveSyncPoint.keysOrRanges, waitForEpoch(), waitForEpoch(), this);
}
@Override
public SimpleReply apply(SafeCommandStore safeStore)
{
- safeStore.commandStore().markShardDurable(safeStore, exclusiveSyncPoint.syncId, exclusiveSyncPoint.ranges);
+ safeStore.commandStore().markShardDurable(safeStore, exclusiveSyncPoint.syncId, (Ranges)exclusiveSyncPoint.keysOrRanges);
return Ok;
}
diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java b/accord-core/src/main/java/accord/messages/TxnRequest.java
index c324cf6..0025329 100644
--- a/accord-core/src/main/java/accord/messages/TxnRequest.java
+++ b/accord-core/src/main/java/accord/messages/TxnRequest.java
@@ -20,17 +20,22 @@
import java.util.function.BiFunction;
-import accord.local.SafeCommandStore;
-import accord.utils.MapReduceConsume;
-import accord.primitives.*;
-import accord.utils.Invariants;
-
import accord.api.RoutingKey;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Routables;
+import accord.primitives.Route;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
import accord.topology.Topologies;
import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.MapReduceConsume;
import static java.lang.Long.min;
@@ -76,11 +81,11 @@
}
@Override
- RoutingKey progressKey(Node node)
+ RoutingKey progressKey()
{
if (doNotComputeProgressKey)
return null;
- return super.progressKey(node);
+ return super.progressKey();
}
}
@@ -136,14 +141,19 @@
this.node = on;
this.replyTo = replyTo;
this.replyContext = replyContext;
- this.progressKey = progressKey(node); // TODO (low priority, clarity): not every class that extends TxnRequest needs this set
+ this.progressKey = progressKey(); // TODO (low priority, clarity): not every class that extends TxnRequest needs this set
process();
}
- RoutingKey progressKey(Node node)
+ RoutingKey progressKey()
+ {
+ return progressKey(node, waitForEpoch, txnId, scope);
+ }
+
+ public static RoutingKey progressKey(Node node, long waitForEpoch, TxnId txnId, PartialRoute<?> scope)
{
// if waitForEpoch < txnId.epoch, then this replica's ownership is unchanged
- long progressEpoch = min(waitForEpoch(), txnId.epoch());
+ long progressEpoch = min(waitForEpoch, txnId.epoch());
return node.trySelectProgressKey(progressEpoch, scope, scope.homeKey());
}
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index 682c33b..730055c 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -238,6 +238,12 @@
}
@Override
+ public ReadType kind()
+ {
+ return ReadType.waitUntilApplied;
+ }
+
+ @Override
protected void cancel()
{
node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> removeListener(in, txnId), node.agent()));
@@ -246,7 +252,7 @@
@Override
public MessageType type()
{
- return MessageType.WAIT_ON_APPLY_REQ;
+ return MessageType.WAIT_UNTIL_APPLIED_REQ;
}
@Override
diff --git a/accord-core/src/main/java/accord/primitives/Seekables.java b/accord-core/src/main/java/accord/primitives/Seekables.java
index 80929d7..07b2f22 100644
--- a/accord-core/src/main/java/accord/primitives/Seekables.java
+++ b/accord-core/src/main/java/accord/primitives/Seekables.java
@@ -19,6 +19,7 @@
package accord.primitives;
import accord.api.RoutingKey;
+import accord.primitives.Routable.Domain;
import static accord.primitives.Routables.Slice.Overlapping;
@@ -37,4 +38,9 @@
Participants<?> toParticipants();
FullRoute<?> toRoute(RoutingKey homeKey);
+
+ static Seekables<?, ?> of(Seekable seekable)
+ {
+ return seekable.domain() == Domain.Range ? Ranges.of(seekable.asRange()) : Keys.of(seekable.asKey());
+ }
}
diff --git a/accord-core/src/main/java/accord/primitives/SyncPoint.java b/accord-core/src/main/java/accord/primitives/SyncPoint.java
index da2b823..fedc64c 100644
--- a/accord-core/src/main/java/accord/primitives/SyncPoint.java
+++ b/accord-core/src/main/java/accord/primitives/SyncPoint.java
@@ -29,41 +29,44 @@
* so while these are processed much like a transaction, they are invisible to real transactions which
* may proceed before this is witnessed by the node processing it.
*/
-public class SyncPoint
+public class SyncPoint<S extends Seekables<?, ?>>
{
public static class SerializationSupport
{
- public static SyncPoint construct(TxnId syncId, Deps waitFor, Ranges ranges, RoutingKey homeKey)
+ public static SyncPoint construct(TxnId syncId, Deps waitFor, Seekables<?,?> keysOrRanges, RoutingKey homeKey, boolean finishedAsync)
{
- return new SyncPoint(syncId, waitFor, ranges, homeKey);
+ return new SyncPoint(syncId, waitFor, keysOrRanges, homeKey, finishedAsync);
}
}
public final TxnId syncId;
public final Deps waitFor;
- public final Ranges ranges;
+ public final S keysOrRanges;
public final RoutingKey homeKey;
+ public final boolean finishedAsync;
- public SyncPoint(TxnId syncId, Deps waitFor, Ranges ranges, FullRangeRoute route)
+ public SyncPoint(TxnId syncId, Deps waitFor, S keysOrRanges, FullRoute route, boolean finishedAsync)
{
- Invariants.checkArgument(ranges.toRoute(route.homeKey).equals(route), "Expected homeKey %s from route %s to be in ranges %s", route.homeKey, route, ranges);
+ Invariants.checkArgument(keysOrRanges.toRoute(route.homeKey()).equals(route), "Expected homeKey %s from route %s to be in ranges %s", route.homeKey(), route, keysOrRanges);
this.syncId = syncId;
this.waitFor = waitFor;
- this.ranges = ranges;
+ this.keysOrRanges = keysOrRanges;
this.homeKey = route.homeKey();
+ this.finishedAsync = finishedAsync;
}
- private SyncPoint(TxnId syncId, Deps waitFor, Ranges ranges, RoutingKey homeKey)
+ private SyncPoint(TxnId syncId, Deps waitFor, S keysOrRanges, RoutingKey homeKey, boolean finishedAsync)
{
this.syncId = syncId;
this.waitFor = waitFor;
- this.ranges = ranges;
+ this.keysOrRanges = keysOrRanges;
this.homeKey = homeKey;
+ this.finishedAsync = finishedAsync;
}
- public FullRangeRoute route()
+ public FullRoute route()
{
- return ranges.toRoute(homeKey);
+ return keysOrRanges.toRoute(homeKey);
}
// TODO (required): document this and its usages; make sure call-sites make sense
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 4f07a29..abf22e8 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -20,11 +20,10 @@
import accord.local.Node.Id;
import accord.utils.Invariants;
+import javax.annotation.Nonnull;
import static accord.utils.Invariants.checkArgument;
-import javax.annotation.Nonnull;
-
public class Timestamp implements Comparable<Timestamp>, EpochSupplier
{
public static final Timestamp MAX = new Timestamp(Long.MAX_VALUE, Long.MAX_VALUE, Id.MAX);
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java
index c3384a2..9349086 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -253,7 +253,7 @@
default Result result(TxnId txnId, Timestamp executeAt, @Nullable Data data)
{
- return query().compute(txnId, executeAt, data, read(), update());
+ return query().compute(txnId, executeAt, keys(), data, read(), update());
}
default Writes execute(TxnId txnId, Timestamp executeAt, @Nullable Data data)
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index aa3b5c6..27b43ef 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -35,8 +35,9 @@
public class Topology
{
- public static final Topology EMPTY = new Topology(0, new Shard[0], Ranges.EMPTY, Collections.emptyMap(), Ranges.EMPTY, new int[0]);
+ public static final long EMPTY_EPOCH = 0;
private static final int[] EMPTY_SUBSET = new int[0];
+ public static final Topology EMPTY = new Topology(EMPTY_EPOCH, new Shard[0], Ranges.EMPTY, Collections.emptyMap(), Ranges.EMPTY, EMPTY_SUBSET);
final long epoch;
final Shard[] shards;
final Ranges ranges;
diff --git a/accord-core/src/main/java/accord/utils/DeterministicSet.java b/accord-core/src/main/java/accord/utils/DeterministicSet.java
index 7087e9e..ec0942f 100644
--- a/accord-core/src/main/java/accord/utils/DeterministicSet.java
+++ b/accord-core/src/main/java/accord/utils/DeterministicSet.java
@@ -104,6 +104,29 @@
};
}
+ public Iterator<T> reverseIterator()
+ {
+ return new Iterator<T>()
+ {
+ Entry<T> previous = head.prev;
+ @Override
+ public boolean hasNext()
+ {
+ return previous != head;
+ }
+
+ @Override
+ public T next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ T result = previous.item;
+ previous = previous.prev;
+ return result;
+ }
+ };
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
index 0c54622..9f878c1 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
@@ -18,17 +18,22 @@
package accord.utils.async;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
+import static accord.utils.Invariants.checkArgument;
+
public class AsyncCallbacks
{
- public static <T> BiConsumer<? super T, Throwable> inExecutor(BiConsumer<? super T, Throwable> callback, Executor executor)
+ public static <T> BiConsumer<? super T, Throwable> inExecutorService(BiConsumer<? super T, Throwable> callback, ExecutorService es)
{
+ // Checking for shutdown once here for the other `inExecutorService` as well as `AsyncChain.addCallback`
+ // So we don't repeat the check
+ checkArgument(!es.isShutdown(), "ExecutorService is shutdown");
return (result, throwable) -> {
try
{
- executor.execute(() -> callback.accept(result, throwable));
+ es.execute(() -> callback.accept(result, throwable));
}
catch (Throwable t)
{
@@ -38,9 +43,9 @@
}
- public static <T> BiConsumer<? super T, Throwable> inExecutor(Runnable runnable, Executor executor)
+ public static <T> BiConsumer<? super T, Throwable> inExecutorService(Runnable runnable, ExecutorService es)
{
- return inExecutor(toCallback(runnable), executor);
+ return inExecutorService(toCallback(runnable), es);
}
public static <T> BiConsumer<T, Throwable> toCallback(Runnable runnable) {
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
index 130731d..43cc6db 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
@@ -19,6 +19,7 @@
package accord.utils.async;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -83,14 +84,14 @@
return addCallback(AsyncCallbacks.toCallback(runnable));
}
- default AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback, Executor executor)
+ default AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback, ExecutorService es)
{
- return addCallback(AsyncCallbacks.inExecutor(callback, executor));
+ return addCallback(AsyncCallbacks.inExecutorService(callback, es));
}
- default AsyncChain<V> addCallback(Runnable runnable, Executor executor)
+ default AsyncChain<V> addCallback(Runnable runnable, ExecutorService es)
{
- return addCallback(AsyncCallbacks.inExecutor(runnable, executor));
+ return addCallback(AsyncCallbacks.inExecutorService(runnable, es));
}
/**
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index ae5a1ce..08ea070 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -593,30 +593,14 @@
public static <V> V getBlocking(AsyncChain<V> chain) throws InterruptedException, ExecutionException
{
- class Result
+ try
{
- final V result;
- final Throwable failure;
-
- public Result(V result, Throwable failure)
- {
- this.result = result;
- this.failure = failure;
- }
+ return getBlocking(chain, 0, TimeUnit.DAYS);
}
-
- AtomicReference<Result> callbackResult = new AtomicReference<>();
- CountDownLatch latch = new CountDownLatch(1);
-
- chain.begin((result, failure) -> {
- callbackResult.set(new Result(result, failure));
- latch.countDown();
- });
-
- latch.await();
- Result result = callbackResult.get();
- if (result.failure == null) return result.result;
- else throw new ExecutionException(result.failure);
+ catch (TimeoutException e)
+ {
+ throw new IllegalStateException("Should not throw timeout exception e");
+ }
}
public static <V> V getBlockingAndRethrow(AsyncChain<V> chain)
@@ -677,8 +661,14 @@
latch.countDown();
});
- if (!latch.await(timeout, unit))
- throw new TimeoutException();
+ if (timeout > 0)
+ {
+ if (!latch.await(timeout, unit))
+ throw new TimeoutException();
+ }
+ else
+ latch.await();
+
Result result = callbackResult.get();
if (result.failure == null) return result.result;
else throw new ExecutionException(result.failure);
@@ -686,6 +676,18 @@
public static <V> V getUninterruptibly(AsyncChain<V> chain) throws ExecutionException
{
+ try
+ {
+ return getUninterruptibly(chain, 0, TimeUnit.DAYS);
+ }
+ catch (TimeoutException e)
+ {
+ throw new IllegalStateException("Should not throw timeout exception e");
+ }
+ }
+
+ public static <V> V getUninterruptibly(AsyncChain<V> chain, long time, TimeUnit unit) throws ExecutionException, TimeoutException
+ {
boolean interrupted = false;
try
{
@@ -693,7 +695,7 @@
{
try
{
- return getBlocking(chain);
+ return getBlocking(chain, time, unit);
}
catch (InterruptedException e)
{
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
index 3269f7a..944d517 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
@@ -18,7 +18,6 @@
package accord.utils.async;
-import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
/**
@@ -36,18 +35,6 @@
return addCallback(AsyncCallbacks.toCallback(runnable));
}
- @Override
- default AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback, Executor executor)
- {
- return addCallback(AsyncCallbacks.inExecutor(callback, executor));
- }
-
- @Override
- default AsyncResult<V> addCallback(Runnable runnable, Executor executor)
- {
- return addCallback(AsyncCallbacks.inExecutor(runnable, executor));
- }
-
boolean isDone();
boolean isSuccess();
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 5773e68..ad7171f 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -46,7 +46,7 @@
}
}
- static class AbstractResult<V> implements AsyncResult<V>
+ public static class AbstractResult<V> implements AsyncResult<V>
{
private static final AtomicReferenceFieldUpdater<AbstractResult, Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, Object.class, "state");
@@ -65,6 +65,17 @@
private void notify(Listener<V> listener, Result<V> result)
{
+ Listener<V> reversed = null;
+ Listener<V> tmp;
+ while (listener != null)
+ {
+ tmp = listener;
+ listener = listener.next;
+ tmp.next = reversed;
+ reversed = tmp;
+ }
+ listener = reversed;
+
List<Throwable> failures = null;
while (listener != null)
{
@@ -109,6 +120,15 @@
return trySetResult(new Result<>(result, failure));
}
+ protected boolean trySuccess(V value)
+ {
+ return trySetResult(value, null);
+ }
+ protected boolean tryFailure(Throwable throwable)
+ {
+ return trySetResult(null, throwable);
+ }
+
private AsyncChain<V> newChain()
{
return new AsyncChains.Head<V>()
@@ -222,13 +242,13 @@
@Override
public boolean trySuccess(V value)
{
- return trySetResult(value, null);
+ return super.trySuccess(value);
}
@Override
public boolean tryFailure(Throwable throwable)
{
- return trySetResult(null, throwable);
+ return super.tryFailure(throwable);
}
}
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 6d42388..3fc3ac9 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -25,6 +25,8 @@
import com.google.common.collect.Sets;
+import java.time.Duration;
+
import accord.api.MessageSink;
import accord.api.Scheduler;
import accord.config.LocalConfig;
@@ -53,6 +55,9 @@
import accord.utils.Invariants;
import accord.utils.ThreadPoolScheduler;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+
import static accord.utils.async.AsyncChains.awaitUninterruptibly;
public class Utils
@@ -161,4 +166,19 @@
awaitUninterruptibly(node.unsafeStart());
return node;
}
+
+ public static void spinUntilSuccess(ThrowingRunnable runnable)
+ {
+ spinUntilSuccess(runnable, 10);
+ }
+
+ public static void spinUntilSuccess(ThrowingRunnable runnable, int timeoutInSeconds)
+ {
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(100))
+ .pollDelay(0, TimeUnit.MILLISECONDS)
+ .atMost(timeoutInSeconds, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .untilAsserted(runnable);
+ }
}
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
index 96be9fe..e12cb19 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
@@ -18,48 +18,94 @@
package accord.coordinate;
-import accord.local.Node;
-import accord.impl.mock.MockCluster;
-import accord.api.Result;
-import accord.impl.mock.MockStore;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
-import org.junit.jupiter.api.Assertions;
+import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import accord.api.Agent;
+import accord.api.BarrierType;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.impl.SimpleProgressLog;
+import accord.impl.TestAgent;
+import accord.impl.mock.MockCluster;
+import accord.impl.mock.MockStore;
+import accord.local.Command;
+import accord.local.Command.TransientListener;
+import accord.local.Commands;
+import accord.local.Commands.AcceptOutcome;
+import accord.local.Commands.ApplyOutcome;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.messages.ReadData.ReadOk;
import accord.primitives.FullKeyRoute;
import accord.primitives.FullRangeRoute;
import accord.primitives.Keys;
+import accord.primitives.PartialDeps;
import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import static accord.Utils.id;
import static accord.Utils.ids;
import static accord.Utils.ranges;
+import static accord.Utils.spinUntilSuccess;
import static accord.Utils.writeTxn;
+import static accord.impl.IntKey.key;
import static accord.impl.IntKey.keys;
import static accord.impl.IntKey.range;
+import static accord.local.Status.Applied;
import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Txn.Kind.Read;
import static accord.primitives.Txn.Kind.Write;
+import static accord.utils.Invariants.checkState;
import static accord.utils.async.AsyncChains.getUninterruptibly;
+import static com.google.common.base.Predicates.alwaysTrue;
+import static java.lang.Thread.sleep;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
public class CoordinateTransactionTest
{
+ private static final Logger logger = LoggerFactory.getLogger(CoordinateTransactionTest.class);
+
+ @AfterEach
+ public void tearDown()
+ {
+ SimpleProgressLog.PAUSE_FOR_TEST = false;
+ }
+
@Test
void simpleTest() throws Throwable
{
try (MockCluster cluster = MockCluster.builder().build())
{
Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ assertNotNull(node);
TxnId txnId = node.nextTxnId(Write, Key);
Keys keys = keys(10);
Txn txn = writeTxn(keys);
FullKeyRoute route = keys.toRoute(keys.get(0).toUnseekable());
Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn, route));
- Assertions.assertEquals(MockStore.RESULT, result);
+ assertEquals(MockStore.RESULT, result);
}
}
@@ -69,14 +115,14 @@
try (MockCluster cluster = MockCluster.builder().build())
{
Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ assertNotNull(node);
TxnId txnId = node.nextTxnId(Write, Key);
Ranges keys = ranges(range(1, 2));
Txn txn = writeTxn(keys);
FullRangeRoute route = keys.toRoute(keys.get(0).someIntersectingRoutingKey(null));
Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn, route));
- Assertions.assertEquals(MockStore.RESULT, result);
+ assertEquals(MockStore.RESULT, result);
}
}
@@ -86,7 +132,7 @@
try (MockCluster cluster = MockCluster.builder().build())
{
Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ assertNotNull(node);
TxnId oldId1 = node.nextTxnId(Write, Key);
TxnId oldId2 = node.nextTxnId(Write, Key);
@@ -98,11 +144,11 @@
Txn txn = writeTxn(keys);
FullKeyRoute route = keys.toRoute(keys.get(0).someIntersectingRoutingKey(null));
getUninterruptibly(CoordinateTransaction.coordinate(node, oldId1, txn, route));
- Assertions.fail();
+ fail();
}
catch (ExecutionException e)
{
- Assertions.assertEquals(Invalidated.class, e.getCause().getClass());
+ assertEquals(Invalidated.class, e.getCause().getClass());
}
Keys keys = keys(2);
@@ -113,6 +159,150 @@
}
@Test
+ void barrierTest() throws Throwable
+ {
+ try (MockCluster cluster = MockCluster.builder().build())
+ {
+ Node node = cluster.get(1);
+ Agent agent = node.agent();
+ assertNotNull(node);
+ long epoch = node.epoch();
+
+ // This is checking for a local barrier so it should succeed even if we drop the completion messages from the other nodes
+ cluster.networkFilter.addFilter(id -> ImmutableSet.of(cluster.get(2).id(), cluster.get(3).id()).contains(id), alwaysTrue(), message -> message instanceof ReadOk);
+ // Should create a sync transaction since no pre-existing one can be used and return as soon as it is locally applied
+ Barrier<Keys> localInitiatingBarrier = Barrier.barrier(node, Keys.of(key(3)), node.epoch(), BarrierType.local);
+ // Sync transaction won't be created until callbacks for existing transaction check runs
+ Semaphore existingTransactionCheckCompleted = new Semaphore(0);
+ localInitiatingBarrier.existingTransactionCheck.addCallback((ignored1, ignored2) -> existingTransactionCheckCompleted.release());
+ assertTrue(existingTransactionCheckCompleted.tryAcquire(5, TimeUnit.SECONDS));
+ // It's possible for the callback to run before the sync point is created in a different callback
+ // because multiple callbacks can run concurrently. `addCallback` might see the finished result
+ // and run immediately in this thread
+ spinUntilSuccess(() -> checkState(localInitiatingBarrier.coordinateSyncPoint != null));
+ // Should be able to find the txnid now and wait for local application
+ TxnId initiatingBarrierSyncTxnId = localInitiatingBarrier.coordinateSyncPoint.txnId;
+ Semaphore barrierAppliedLocally = new Semaphore(0);
+ node.ifLocal(PreLoadContext.contextFor(initiatingBarrierSyncTxnId), key(3).toUnseekable(), epoch, (safeStore) ->
+ safeStore.get(initiatingBarrierSyncTxnId, key(3).toUnseekable()).addAndInvokeListener(
+ safeStore,
+ commandListener((safeStore2, command) -> {
+ if (command.current().is(Applied))
+ barrierAppliedLocally.release();
+ }))
+ ).begin(agent);
+ assertTrue(barrierAppliedLocally.tryAcquire(5, TimeUnit.SECONDS));
+ // If the command is locally applied the future for the barrier should be completed as well and not waiting on messages from other nodes
+ getUninterruptibly(localInitiatingBarrier, 1, TimeUnit.SECONDS);
+ cluster.networkFilter.clear();
+
+ Keys globalSyncBarrierKeys = keys(2, 3);
+ // At least one other should have completed by the time it is locally applied, a down node should be fine since it is quorum
+ cluster.networkFilter.isolate(cluster.get(2).id());
+ Barrier<Keys> globalInitiatingBarrier = Barrier.barrier(node, globalSyncBarrierKeys, node.epoch(), BarrierType.global_sync);
+ Timestamp globalBarrierTimestamp = getUninterruptibly(globalInitiatingBarrier);
+ int localBarrierCount = ((TestAgent)agent).completedLocalBarriers.getOrDefault(globalBarrierTimestamp, new AtomicInteger(0)).get();
+ assertNotNull(globalInitiatingBarrier.coordinateSyncPoint);
+ assertEquals(2, localBarrierCount);
+ logger.info("Local barrier count " + localBarrierCount);
+ cluster.networkFilter.clear();
+
+ // The existing barrier should suffice here
+ Barrier<Keys> nonInitiatingLocalBarrier = Barrier.barrier(node, Keys.of(key(2)), node.epoch(), BarrierType.local);
+ Timestamp previousBarrierTimestamp = getUninterruptibly(nonInitiatingLocalBarrier);
+ assertNull(nonInitiatingLocalBarrier.coordinateSyncPoint);
+ assertEquals(previousBarrierTimestamp, getUninterruptibly(nonInitiatingLocalBarrier));
+ assertEquals(previousBarrierTimestamp, globalBarrierTimestamp);
+
+ // Sync over nothing should work
+ SyncPoint<Ranges> syncPoint = getUninterruptibly(getUninterruptibly(CoordinateSyncPoint.inclusive(node, ranges(range(99, 100)), false)));
+ assertEquals(node.epoch(), syncPoint.syncId.epoch());
+
+ // Keys and so on for the upcoming transaction pair
+ Keys keys = keys(1);
+ accord.api.Key key = keys.get(0);
+ RoutingKey homeKey = key.toUnseekable();
+ Ranges ranges = ranges(homeKey.asRange());
+ FullKeyRoute route = keys.toRoute(homeKey);
+ TxnId txnId = node.nextTxnId(Write, Key);
+
+ // Create a txn to block the one we are about to create after this
+ SimpleProgressLog.PAUSE_FOR_TEST = true;
+ TxnId blockingTxnId = new TxnId(txnId.epoch(), 1, Read, Key, new Id(1));
+ Txn blockingTxn = agent.emptyTxn(Read, keys);
+ PreLoadContext blockingTxnContext = PreLoadContext.contextFor(blockingTxnId, keys);
+ for (Node n : cluster)
+ assertEquals(AcceptOutcome.Success, getUninterruptibly(n.unsafeForKey(key).submit(blockingTxnContext, store ->
+ Commands.preaccept(store, store.get(blockingTxnId, homeKey), blockingTxnId, blockingTxnId.epoch(), blockingTxn.slice(store.ranges().allAt(blockingTxnId), true), route, null))));
+
+ // Now create the transaction that should be blocked by the previous one
+ Txn txn = agent.emptyTxn(Write, keys);
+ PreLoadContext context = PreLoadContext.contextFor(txnId, keys);
+ for (Node n : cluster)
+ assertEquals(AcceptOutcome.Success, getUninterruptibly(n.unsafeForKey(key).submit(context, store ->
+ Commands.preaccept(store, store.get(txnId, homeKey), txnId, txnId.epoch(), txn.slice(store.ranges().allAt(txnId.epoch()), true), route, null))));
+
+
+ CoordinateSyncPoint<Ranges> syncInclusiveSyncFuture = getUninterruptibly(CoordinateSyncPoint.inclusive(node, ranges, false));
+ // Shouldn't complete because it is blocked waiting for the dependency just created to apply
+ sleep(500);
+ assertFalse(syncInclusiveSyncFuture.isDone());
+
+ // Async sync should return a result immediately since we are going to wait on the sync point transaction that was created by the sync point
+ CoordinateSyncPoint<Ranges> asyncInclusiveSyncFuture = getUninterruptibly(CoordinateSyncPoint.inclusive(node, ranges, true));
+ SyncPoint<Ranges> localSyncPoint = getUninterruptibly(asyncInclusiveSyncFuture);
+ Semaphore localSyncOccurred = new Semaphore(0);
+ node.commandStores().ifLocal(PreLoadContext.contextFor(localSyncPoint.syncId), homeKey, epoch, epoch, safeStore ->
+ safeStore.get(localSyncPoint.syncId, homeKey.toUnseekable()).addAndInvokeListener(
+ safeStore,
+ commandListener((safeStore2, command) -> {
+ if (command.current().hasBeen(Applied))
+ localSyncOccurred.release();
+ })
+ )
+ ).begin(agent);
+
+ // Move to preapplied in order to test that Barrier will find the transaction and add a listener
+ for (Node n : cluster)
+ getUninterruptibly(n.unsafeForKey(key).execute(context, store -> {
+ SafeCommand safeCommand = store.get(txnId, homeKey);
+ Command command = safeCommand.current();
+ PartialDeps.Builder depsBuilder = PartialDeps.builder(store.ranges().currentRanges());
+ depsBuilder.add(key, blockingTxnId);
+ PartialDeps partialDeps = depsBuilder.build();
+ Commands.commit(store, safeCommand, txnId, route, null, command.partialTxn(), txnId, partialDeps);
+ Commands.apply(store, safeCommand, txnId, route, null, txnId, partialDeps, command.partialTxn(), txn.execute(txnId, txnId, null), txn.query().compute(txnId, txnId, keys, null, null, null));
+ }));
+
+ Barrier<Keys> listeningLocalBarrier = Barrier.barrier(node, Keys.of(key), node.epoch(), BarrierType.local);
+ // Wait and make sure the existing transaction check worked and there is no coordinate sync point created
+ Thread.sleep(500);
+ assertNull(listeningLocalBarrier.coordinateSyncPoint);
+ assertNotNull(listeningLocalBarrier.existingTransactionCheck);
+ assertEquals(txnId, getUninterruptibly(listeningLocalBarrier.existingTransactionCheck).executeAt);
+ assertFalse(listeningLocalBarrier.isDone());
+
+ // Apply the blockingTxn to unblock the rest
+ for (Node n : cluster)
+ assertEquals(ApplyOutcome.Success, getUninterruptibly(n.unsafeForKey(key).submit(blockingTxnContext, store -> {
+ return Commands.apply(store, store.get(blockingTxnId, homeKey), blockingTxnId, route, null, blockingTxnId, PartialDeps.builder(store.ranges().allAt(blockingTxnId.epoch())).build(), blockingTxn.slice(store.ranges().allAt(blockingTxnId.epoch()), true), blockingTxn.execute(blockingTxnId, blockingTxnId, null), blockingTxn.query().compute(blockingTxnId, blockingTxnId, keys, null, null, null));
+ })));
+ // Global sync should be unblocked
+ syncPoint = getUninterruptibly(syncInclusiveSyncFuture);
+ assertEquals(node.epoch(), syncPoint.syncId.epoch());
+ // Command listener for local sync transaction should get notified
+ assertTrue(localSyncOccurred.tryAcquire(5, TimeUnit.SECONDS));
+ // Listening local barrier should have succeeded in waiting on the local transaction that just applied
+ assertEquals(getUninterruptibly(listeningLocalBarrier), getUninterruptibly(listeningLocalBarrier.existingTransactionCheck).executeAt);
+ assertEquals(txnId, getUninterruptibly(listeningLocalBarrier));
+ }
+ finally
+ {
+ SimpleProgressLog.PAUSE_FOR_TEST = false;
+ }
+ }
+
+ @Test
void slowPathTest() throws Throwable
{
try (MockCluster cluster = MockCluster.builder().nodes(7).replication(7).build())
@@ -120,11 +310,11 @@
cluster.networkFilter.isolate(ids(5, 7));
Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ assertNotNull(node);
Txn txn = writeTxn(keys(10));
Result result = getUninterruptibly(cluster.get(id(1)).coordinate(txn));
- Assertions.assertEquals(MockStore.RESULT, result);
+ assertEquals(MockStore.RESULT, result);
}
}
@@ -134,7 +324,7 @@
txnId = new TxnId(txnId.epoch(), txnId.hlc() + clock, Write, Key, txnId.node);
Txn txn = writeTxn(keys);
Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn, node.computeRoute(txnId, txn.keys())));
- Assertions.assertEquals(MockStore.RESULT, result);
+ assertEquals(MockStore.RESULT, result);
return txnId;
}
@@ -144,7 +334,7 @@
try (MockCluster cluster = MockCluster.builder().nodes(6).maxKey(600).build())
{
Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ assertNotNull(node);
TxnId txnId1 = coordinate(node, 100, keys(50, 350, 550));
TxnId txnId2 = coordinate(node, 150, keys(250, 350, 450));
@@ -160,12 +350,12 @@
cluster.networkFilter.isolate(ids(5, 7));
Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ assertNotNull(node);
Keys keys = keys(10);
Txn txn = new Txn.InMemory(keys, MockStore.read(Keys.EMPTY), MockStore.QUERY, MockStore.update(keys));
Result result = getUninterruptibly(cluster.get(id(1)).coordinate(txn));
- Assertions.assertEquals(MockStore.RESULT, result);
+ assertEquals(MockStore.RESULT, result);
}
}
@@ -177,12 +367,12 @@
cluster.networkFilter.isolate(ids(5, 7));
Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ assertNotNull(node);
Keys keys = keys(10);
Txn txn = new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY, MockStore.update(Keys.EMPTY));
Result result = getUninterruptibly(cluster.get(id(1)).coordinate(txn));
- Assertions.assertEquals(MockStore.RESULT, result);
+ assertEquals(MockStore.RESULT, result);
}
}
@@ -192,18 +382,36 @@
try (MockCluster cluster = MockCluster.builder().build())
{
Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ assertNotNull(node);
TxnId txnId = node.nextTxnId(Write, Key);
Keys oneKey = keys(10);
Keys twoKeys = keys(10, 20);
Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), MockStore.QUERY, MockStore.update(twoKeys));
Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn, txn.keys().toRoute(oneKey.get(0).toUnseekable())));
- Assertions.assertEquals(MockStore.RESULT, result);
+ assertEquals(MockStore.RESULT, result);
txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), MockStore.QUERY, MockStore.update(Keys.EMPTY));
result = getUninterruptibly(cluster.get(id(1)).coordinate(txn));
- Assertions.assertEquals(MockStore.RESULT, result);
+ assertEquals(MockStore.RESULT, result);
}
}
+
+ private static Command.TransientListener commandListener(BiConsumer<SafeCommandStore, SafeCommand> listener)
+ {
+ return new TransientListener()
+ {
+ @Override
+ public void onChange(SafeCommandStore safeStore, SafeCommand command)
+ {
+ listener.accept(safeStore, command);
+ }
+
+ @Override
+ public PreLoadContext listenerPreLoadContext(TxnId caller)
+ {
+ return PreLoadContext.contextFor(caller);
+ }
+ };
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java b/accord-core/src/test/java/accord/impl/TestAgent.java
index 908108d..d3e4832 100644
--- a/accord-core/src/test/java/accord/impl/TestAgent.java
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -18,17 +18,33 @@
package accord.impl;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Agent;
+import accord.api.Result;
import accord.impl.mock.MockStore;
import accord.local.Command;
import accord.local.Node;
-import accord.api.Agent;
-import accord.api.Result;
-import accord.primitives.*;
-
-import java.util.concurrent.TimeUnit;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
public class TestAgent implements Agent
{
+ private static final Logger logger = LoggerFactory.getLogger(TestAgent.class);
+
+ public static final ConcurrentMap<Timestamp, AtomicInteger> completedLocalBarriers = new ConcurrentHashMap<>();
+
public static class RethrowAgent extends TestAgent
{
@Override
@@ -82,6 +98,7 @@
@Override
public void onUncaughtException(Throwable t)
{
+ logger.error("Uncaught exception", t);
}
@Override
@@ -100,4 +117,9 @@
{
return new Txn.InMemory(kind, keysOrRanges, MockStore.read(Keys.EMPTY), MockStore.QUERY, null);
}
+
+ @Override
+ public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull Timestamp executeAt) {
+ completedLocalBarriers.computeIfAbsent(executeAt, ignored -> new AtomicInteger()).incrementAndGet();
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index 239d355..9b90ddf 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -20,17 +20,19 @@
import java.util.Map;
-import accord.api.Read;
-import accord.api.Update;
-import accord.local.Node.Id;
import accord.api.Data;
import accord.api.Key;
import accord.api.Query;
+import accord.api.Read;
import accord.api.Result;
+import accord.api.Update;
+import accord.local.Node.Id;
import accord.primitives.Keys;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Timestamped;
+import javax.annotation.Nonnull;
public class ListQuery implements Query
{
@@ -44,7 +46,7 @@
}
@Override
- public Result compute(TxnId txnId, Timestamp executeAt, Data data, Read untypedRead, Update update)
+ public Result compute(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Seekables<?, ?> keys, Data data, Read untypedRead, Update update)
{
if (data == null)
return new ListResult(client, requestId, txnId, Keys.EMPTY, Keys.EMPTY, new int[0][0], (ListUpdate) update);
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index 8fa17d7..bae14b5 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -43,7 +43,7 @@
};
public static final Result RESULT = new Result() {};
- public static final Query QUERY = (txnId, executeAt, data, read, update) -> RESULT;
+ public static final Query QUERY = (txnId, executeAt, keys, data, read, update) -> RESULT;
public static final Write WRITE = (key, commandStore, executeAt, store, command) -> Writes.SUCCESS;
public static Read read(Seekables<?, ?> keys)
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
index cf0621b..e75e3d9 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
@@ -19,15 +19,17 @@
package accord.maelstrom;
import java.util.Map;
+import javax.annotation.Nonnull;
-import accord.api.Read;
-import accord.api.Update;
-import accord.local.Node;
-import accord.local.Node.Id;
import accord.api.Data;
import accord.api.Key;
import accord.api.Query;
+import accord.api.Read;
import accord.api.Result;
+import accord.api.Update;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -43,7 +45,7 @@
}
@Override
- public Result compute(TxnId txnId, Timestamp executeAt, Data data, Read untypedRead, Update update)
+ public Result compute(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Seekables<?, ?> keys, Data data, Read untypedRead, Update update)
{
MaelstromRead read = (MaelstromRead) untypedRead;
Value[] values = new Value[read.readKeys.size()];