Split AsyncChain and AsyncResult; normalise AsyncResult with C* Future
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20906
diff --git a/.gitmodules b/.gitmodules
index a0aed84..616dacf 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
- url = https://github.com/belliottsmith/cassandra-accord.git
- branch = 20905-ssverifier
+ url = https://github.com/apache/cassandra-accord.git
+ branch = trunk
diff --git a/modules/accord b/modules/accord
index ff4119f..78b84b0 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit ff4119fa45ad9700cfa94cd50b5cd4af9dd0b7a4
+Subproject commit 78b84b08e13530722cb785a3b748fd2075c1c449
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
index 9bf1595..9705fab 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
@@ -229,7 +229,7 @@
long deadlineNanos = startedAtNanos + timeoutNanos;
// TODO (expected): use the source bounds for the streams to avoid waiting unnecessarily long
AccordService.getBlocking(accordService.maxConflict(accordRanges)
- .flatMap(min -> accordService.sync("[Stream #" + session.planId() + ']', min, accordRanges, null, Self, NoRemote, timeoutNanos, NANOSECONDS))
+ .flatMap(min -> accordService.sync("[Stream #" + session.planId() + ']', min, accordRanges, null, Self, NoRemote, timeoutNanos, NANOSECONDS).chain())
, accordRanges, new LatencyRequestBookkeeping(cfs.metric.accordPostStreamRepair), startedAtNanos, deadlineNanos);
}
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index e69891c..87029aa 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -146,7 +146,6 @@
import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
-import static accord.utils.async.AsyncChains.getBlockingAndRethrow;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -368,14 +367,14 @@
List<Entry> cfks = new CopyOnWriteArrayList<>();
PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query");
CommandStores commandStores = AccordService.instance().node().commandStores();
- getBlockingAndRethrow(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
+ AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
SafeCommandsForKey safeCfk = safeStore.get(key);
CommandsForKey cfk = safeCfk.current();
if (cfk == null)
return;
cfks.add(new Entry(safeStore.commandStore().id(), cfk));
- }).beginAsResult());
+ }));
if (cfks.isEmpty())
return null;
@@ -478,7 +477,7 @@
List<Entry> cfks = new CopyOnWriteArrayList<>();
PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table query");
CommandStores commandStores = AccordService.instance().node().commandStores();
- getBlockingAndRethrow(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
+ AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
SafeCommandsForKey safeCfk = safeStore.get(key);
CommandsForKey cfk = safeCfk.current();
if (cfk == null)
@@ -1469,7 +1468,7 @@
{
consumer.accept(command.route(), result);
}
- return result;
+ return result.chain();
});
}
@@ -1504,12 +1503,10 @@
private void run(TxnId txnId, int commandStoreId, Function<SafeCommandStore, AsyncChain<Void>> apply)
{
AccordService accord = (AccordService) AccordService.instance();
- AsyncChains.awaitUninterruptibly(accord.node()
- .commandStores()
- .forId(commandStoreId)
- .submit(PreLoadContext.contextFor(txnId, TXN_OPS), apply)
- .flatMap(i -> i)
- .beginAsResult());
+ AccordService.getBlocking(accord.node()
+ .commandStores()
+ .forId(commandStoreId)
+ .chain(PreLoadContext.contextFor(txnId, TXN_OPS), apply));
}
private void cleanup(TxnId txnId, int commandStoreId, Cleanup cleanup)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 5a73e8a..5513805 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -22,7 +22,6 @@
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -38,6 +37,7 @@
import org.slf4j.LoggerFactory;
import accord.api.Agent;
+import accord.api.AsyncExecutor;
import accord.api.DataStore;
import accord.api.Journal;
import accord.api.LocalListeners;
@@ -247,7 +247,7 @@
// TODO (desired): we use this for executing callbacks with mutual exclusivity,
// but we don't need to block the actual CommandStore - could quite easily
// inflate a separate queue dynamically in AccordExecutor
- public Executor taskExecutor()
+ public AsyncExecutor taskExecutor()
{
return exclusiveExecutor;
}
@@ -336,25 +336,30 @@
return lastSystemTimestampMicros;
}
@Override
- public <T> AsyncChain<T> build(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
+ public <T> AsyncChain<T> chain(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
{
return AccordTask.create(this, loadCtx, function).chain();
}
@Override
- public <T> AsyncChain<T> build(Callable<T> task)
- {
- return AsyncChains.ofCallable(taskExecutor(), task);
- }
-
- @Override
- public AsyncChain<Void> build(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer)
+ public AsyncChain<Void> chain(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer)
{
return AccordTask.create(this, preLoadContext, consumer).chain();
}
- public AccordSafeCommandStore begin(AccordTask<?> operation,
- @Nullable CommandsForRanges commandsForRanges)
+ @Override
+ public <T> AsyncChain<T> chain(Callable<T> call)
+ {
+ return taskExecutor().chain(call);
+ }
+
+ @Override
+ public void execute(Runnable run)
+ {
+ taskExecutor().execute(run);
+ }
+
+ public AccordSafeCommandStore begin(AccordTask<?> operation, @Nullable CommandsForRanges commandsForRanges)
{
require(current == null);
current = AccordSafeCommandStore.create(operation, commandsForRanges, this);
@@ -495,7 +500,7 @@
}
}
- ready.begin((success, fail) -> {
+ ready.invoke((success, fail) -> {
if (fail != null)
{
logger.error("{}: failed to ensure durability of {} ({})", this, ranges, reportId, fail);
@@ -541,7 +546,7 @@
if (onlyNonDurable && !maybeShouldReplay(txnId))
return AsyncChains.success(null);
- return store.submit(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
+ return store.chain(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
if (onlyNonDurable && !shouldReplay(txnId, safeStore.unsafeGet(txnId).current().participants()))
return null;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 0049e71..5dc0d19 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -652,7 +652,7 @@
public Future<Void> unsafeLocalSyncNotified(long epoch)
{
AsyncPromise<Void> promise = new AsyncPromise<>();
- getOrCreateEpochState(epoch).localSyncNotified().begin((result, failure) -> {
+ getOrCreateEpochState(epoch).localSyncNotified().invoke((result, failure) -> {
if (failure != null) promise.tryFailure(failure);
else promise.trySuccess(result);
});
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index b026168..3f91882 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -56,6 +56,10 @@
import accord.utils.TriConsumer;
import accord.utils.TriFunction;
import accord.utils.UnhandledEnum;
+import accord.utils.async.AsyncCallbacks.CallAndCallback;
+import accord.utils.async.AsyncCallbacks.FlatCallAndCallback;
+import accord.utils.async.AsyncCallbacks.RunAndCallback;
+import accord.utils.async.AsyncCallbacks.RunOrFail;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.Cancellable;
@@ -430,16 +434,10 @@
}
@Override
- public <T> AsyncChain<T> build(Callable<T> task)
+ public Cancellable execute(RunOrFail runOrFail)
{
- return new AsyncChains.Head<>()
- {
- @Override
- protected Cancellable start(BiConsumer<? super T, Throwable> callback)
- {
- return submit(new PlainChain<>(task, callback, null));
- }
- };
+ PlainChain submit = new PlainChain(runOrFail);
+ return submit(submit);
}
public <T> AsyncChain<T> buildDebuggable(Callable<T> task, Object describe)
@@ -449,7 +447,7 @@
@Override
protected Cancellable start(BiConsumer<? super T, Throwable> callback)
{
- return submit(new DebuggableChain<>(task, callback, null, describe));
+ return submit(new DebuggableChain(new CallAndCallback<>(task, callback), null, 0, describe));
}
};
}
@@ -581,7 +579,7 @@
void submitExclusive(Runnable runnable)
{
- submitPlainExclusive(new PlainRunnable(null, runnable, null));
+ submitPlainExclusive(new PlainRunnable(null, runnable));
}
private void submitPlainExclusive(Plain task)
@@ -774,14 +772,14 @@
public Future<?> submit(Runnable run)
{
- PlainRunnable task = new PlainRunnable(new AsyncPromise<>(), run, null);
+ PlainRunnable task = new PlainRunnable(new AsyncPromise<>(), run);
submit(task);
return task.result;
}
public void execute(Runnable command)
{
- submit(new PlainRunnable(null, command, null));
+ submit(new PlainRunnable(null, command));
}
private Cancellable submit(Plain task)
@@ -804,11 +802,6 @@
}
}
- public void execute(Runnable command, AccordCommandStore commandStore)
- {
- submit(new PlainRunnable(null, command, commandStore.exclusiveExecutor));
- }
-
@Override
public void setCapacity(long bytes)
{
@@ -1107,53 +1100,91 @@
}
@Override
- public <T> AsyncChain<T> build(Callable<T> call)
+ public AsyncChain<Void> chain(Runnable run)
{
- int position = inExecutor() && task != null ? task.queuePosition : 0;
+ int position = inheritQueuePosition();
return new AsyncChains.Head<>()
{
@Override
- protected Cancellable start(BiConsumer<? super T, Throwable> callback)
+ protected Cancellable start(BiConsumer<? super Void, Throwable> callback)
{
- PlainChain<T> submit = new PlainChain<>(call, callback, SequentialExecutor.this);
- submit.queuePosition = position;
- return AccordExecutor.this.submit(submit);
+ return execute(new RunAndCallback(run, callback), position);
}
};
}
@Override
+ public <T> AsyncChain<T> chain(Callable<T> call)
+ {
+ int position = inheritQueuePosition();
+ return new AsyncChains.Head<>()
+ {
+ @Override
+ protected Cancellable start(BiConsumer<? super T, Throwable> callback)
+ {
+ return execute(new CallAndCallback<>(call, callback), position);
+ }
+ };
+ }
+
+ @Override
+ public <T> AsyncChain<T> flatChain(Callable<? extends AsyncChain<T>> call)
+ {
+ int position = inheritQueuePosition();
+ return new AsyncChains.Head<>()
+ {
+ @Override
+ protected Cancellable start(BiConsumer<? super T, Throwable> callback)
+ {
+ return execute(new FlatCallAndCallback<>(call, callback), position);
+ }
+ };
+ }
+
+ @Override
+ public Cancellable execute(RunOrFail runOrFail)
+ {
+ return execute(runOrFail, inheritQueuePosition());
+ }
+
+ private int inheritQueuePosition()
+ {
+ return inExecutor() && task != null ? task.queuePosition : 0;
+ }
+
+ private Cancellable execute(RunOrFail runOrFail, int queuePosition)
+ {
+ PlainChain submit = new PlainChain(runOrFail, SequentialExecutor.this, queuePosition);
+ return AccordExecutor.this.submit(submit);
+ }
+
+ @Override
public void execute(Runnable run)
{
- PlainRunnable submit = new PlainRunnable(null, run, this);
- if (inExecutor() && this.task != null)
- submit.queuePosition = this.task.queuePosition;
+ PlainRunnable submit = new PlainRunnable(null, run, this, inheritQueuePosition());
AccordExecutor.this.submit(submit);
}
@Override
- public void maybeExecuteImmediately(Runnable run)
+ public boolean tryExecuteImmediately(Runnable run)
{
Thread self = Thread.currentThread();
Thread owner = this.owner;
- if (owner == self || (owner == null && ownerUpdater.compareAndSet(this, null, self)))
+ if (owner != null ? owner != self : !ownerUpdater.compareAndSet(this, null, self))
+ return false;
+
+ try { run.run(); }
+ catch (Throwable t) { agent.onUncaughtException(t); }
+ finally
{
- try { run.run(); }
- catch (Throwable t) { agent.onUncaughtException(t); }
- finally
+ if (owner == null)
{
- if (owner == null)
- {
- this.owner = null;
- if (waiting != null)
- LockSupport.unpark(waiting);
- }
+ this.owner = null;
+ if (waiting != null)
+ LockSupport.unpark(waiting);
}
}
- else
- {
- execute(run);
- }
+ return true;
}
}
@@ -1287,14 +1318,20 @@
PlainRunnable(Runnable run)
{
- this(null, run, null);
+ this(null, run);
}
- PlainRunnable(AsyncPromise<Void> result, Runnable run, @Nullable SequentialExecutor executor)
+ PlainRunnable(AsyncPromise<Void> result, Runnable run)
+ {
+ this(result, run, null, 0);
+ }
+
+ PlainRunnable(AsyncPromise<Void> result, Runnable run, @Nullable SequentialExecutor executor, int queuePosition)
{
this.result = result;
this.run = run;
this.executor = executor;
+ this.queuePosition = queuePosition;
}
@Override
@@ -1514,17 +1551,21 @@
}
}
- class PlainChain<T> extends Plain
+ class PlainChain extends Plain
{
- final Callable<T> call;
- final BiConsumer<? super T, Throwable> callback;
+ final RunOrFail runOrFail;
final @Nullable SequentialExecutor executor;
- PlainChain(Callable<T> call, BiConsumer<? super T, Throwable> callback, @Nullable SequentialExecutor executor)
+ PlainChain(RunOrFail runOrFail)
{
- this.call = call;
- this.callback = callback;
+ this(runOrFail, null, 0);
+ }
+
+ PlainChain(RunOrFail runOrFail, @Nullable SequentialExecutor executor, int queuePosition)
+ {
+ this.runOrFail = runOrFail;
this.executor = executor;
+ this.queuePosition = queuePosition;
}
@Override
@@ -1536,23 +1577,14 @@
@Override
protected void runInternal()
{
- T success;
try (Closeable close = locals.get())
{
- success = call.call();
- }
- catch (Throwable t)
- {
- fail(t);
- return;
- }
- try
- {
- callback.accept(success, null);
+ runOrFail.run();
}
catch (Throwable t)
{
agent.onUncaughtException(t);
+ return;
}
}
@@ -1561,7 +1593,7 @@
{
try
{
- callback.accept(null, fail);
+ runOrFail.fail(fail);
}
catch (Throwable t)
{
@@ -1571,15 +1603,15 @@
}
}
- class DebuggableChain<T> extends PlainChain<T> implements DebuggableTask
+ class DebuggableChain extends PlainChain implements DebuggableTask
{
final long createdAtNanos;
long startedAtNanos;
final Object describe;
- DebuggableChain(Callable<T> call, BiConsumer<? super T, Throwable> callback, @Nullable SequentialExecutor executor, Object describe)
+ DebuggableChain(RunOrFail runOrFail, @Nullable SequentialExecutor executor, int queuePosition, Object describe)
{
- super(call, callback, executor);
+ super(runOrFail, executor, queuePosition);
this.createdAtNanos = MonotonicClock.Global.approxTime.now();
this.describe = Invariants.nonNull(describe);
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 96b9336..058b917 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nonnull;
@@ -41,6 +42,7 @@
import org.apache.cassandra.metrics.AccordReplicaMetrics;
import org.apache.cassandra.service.accord.api.AccordViolationHandler;
import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,7 +135,6 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.Promise;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static accord.api.Journal.TopologyUpdate;
@@ -553,7 +554,7 @@
}
@Override
- public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
+ public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
{
return node.durability().sync(requestedBy, ExclusiveSyncPoint, minBound, ranges, include, syncLocal, syncRemote, timeout, timeoutUnits);
}
@@ -580,8 +581,8 @@
return node.withEpochAtLeast(txnId.epoch(), null, () -> {
Txn txn = new Txn.InMemory(Write, keys, TxnRead.createNoOpRead(keys), TxnQuery.UNSAFE_EMPTY, TxnUpdate.empty(), new TableMetadatasAndKeys(TableMetadatas.none(), keys));
return CoordinateTransaction.coordinate(node, route, txnId, txn)
- .map(ignore -> (Void) null).beginAsResult();
- }).beginAsResult();
+ .mapToNull();
+ });
}
@Override
@@ -590,6 +591,53 @@
return CoordinateMaxConflict.maxConflict(node, ranges);
}
+ static class AsyncFutureCallback<V> extends AsyncFuture<V> implements BiConsumer<V, Throwable>
+ {
+ @Override
+ public void accept(V v, Throwable fail)
+ {
+ if (fail == null) trySuccess(v);
+ else tryFailure(fail);
+ }
+ }
+
+ public static <V> Future<V> toFuture(AsyncChain<V> chain)
+ {
+ AsyncFutureCallback<V> future = new AsyncFutureCallback<>();
+ chain.begin(future);
+ return future;
+ }
+
+ public static <V> Future<V> toFuture(AsyncResult<V> result)
+ {
+ if (result instanceof Future<?>)
+ return (Future<V>) result;
+
+ AsyncPromise<V> promise = new AsyncPromise<>();
+ result.invoke((success, failure) -> {
+ if (failure == null) promise.trySuccess(success);
+ else promise.tryFailure(failure);
+ });
+ return promise;
+ }
+
+ public static <V> V getBlocking(AsyncChain<V> async)
+ {
+ return syncAndRethrow(toFuture(async)).getNow();
+ }
+
+ public static <V> V getBlocking(AsyncResult<V> async)
+ {
+ return syncAndRethrow(toFuture(async)).getNow();
+ }
+
+ private static <V> Future<V> syncAndRethrow(Future<V> future)
+ {
+ future.syncThrowUncheckedOnInterrupt()
+ .rethrowIfFailed();
+ return future;
+ }
+
public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest)
{
return getBlocking(async, null, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest);
@@ -602,26 +650,23 @@
return result.awaitAndGet();
}
+ public static <V> V getBlocking(AsyncResult<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest)
+ {
+ return getBlocking(async, null, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest);
+ }
+
+ public static <V> V getBlocking(AsyncResult<V> async, @Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest)
+ {
+ AccordResult<V> result = new AccordResult<>(txnId, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest);
+ async.invoke(result);
+ return result.awaitAndGet();
+ }
+
public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline)
{
return getBlocking(async, keysOrRanges, bookkeeping, startedAt, deadline, false);
}
- public static <V> V getBlocking(AsyncChain<V> async)
- {
- return asPromise(async).syncUninterruptibly().getNow();
- }
-
- public static <V> Promise<V> asPromise(AsyncChain<V> async)
- {
- AsyncPromise<V> promise = new AsyncPromise<>();
- async.begin((result, failure) -> {
- if (failure == null) promise.trySuccess(result);
- else promise.tryFailure(failure);
- });
- return promise;
- }
-
public static Keys intersecting(Keys keys)
{
if (keys.isEmpty())
@@ -719,7 +764,7 @@
bookkeeping.metrics.keySize.update(txn.keys().size());
long deadlineNanos = requestTime.computeDeadline(timeout);
AccordResult<TxnResult> result = new AccordResult<>(txnId, txn.keys(), bookkeeping, requestTime.startedAtNanos(), deadlineNanos, true);
- ((AsyncResult)node.coordinate(txnId, txn, minEpoch, deadlineNanos)).begin(result);
+ node.coordinate(txnId, txn, minEpoch, deadlineNanos).begin((BiConsumer) result);
return result;
}
@@ -754,7 +799,7 @@
}
Ready ready = new Ready();
AccordCommandStores commandStores = (AccordCommandStores) node.commandStores();
- AsyncChains.getBlockingAndRethrow(commandStores.forEach((PreLoadContext.Empty)() -> "Flush Caches", safeStore -> {
+ getBlocking(commandStores.forEach((PreLoadContext.Empty)() -> "Flush Caches", safeStore -> {
AccordCommandStore commandStore = (AccordCommandStore)safeStore.commandStore();
try (AccordCommandStore.ExclusiveCaches caches = commandStore.lockCaches())
{
@@ -769,7 +814,7 @@
}));
ready.decrement();
AsyncPromise<Void> result = new AsyncPromise<>();
- ready.begin((success, fail) -> {
+ ready.invoke((success, fail) -> {
if (fail != null) result.tryFailure(fail);
else result.trySuccess(null);
});
@@ -839,19 +884,7 @@
@Override
public List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId txnId)
{
- AsyncChain<List<CommandStoreTxnBlockedGraph>> states = loadDebug(txnId);
- try
- {
- return AsyncChains.getBlocking(states);
- }
- catch (InterruptedException e)
- {
- throw new UncheckedInterruptedException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e.getCause());
- }
+ return getBlocking(loadDebug(txnId));
}
public AsyncChain<List<CommandStoreTxnBlockedGraph>> loadDebug(TxnId original)
@@ -862,11 +895,11 @@
int[] ids = commandStores.ids();
List<AsyncChain<CommandStoreTxnBlockedGraph>> chains = new ArrayList<>(ids.length);
for (int id : ids)
- chains.add(loadDebug(original, commandStores.forId(id)));
+ chains.add(loadDebug(original, commandStores.forId(id)).chain());
return AsyncChains.allOf(chains);
}
- private AsyncChain<CommandStoreTxnBlockedGraph> loadDebug(TxnId txnId, CommandStore store)
+ private AsyncResult<CommandStoreTxnBlockedGraph> loadDebug(TxnId txnId, CommandStore store)
{
CommandStoreTxnBlockedGraph.Builder state = new CommandStoreTxnBlockedGraph.Builder(store.id());
populateAsync(state, store, txnId);
@@ -995,7 +1028,7 @@
@Override
public void ensureMinHlc(long minHlc)
{
- asPromise(node.updateMinHlc(minHlc >= 0 ? minHlc : 0)).syncUninterruptibly();
+ toFuture(node.updateMinHlc(minHlc >= 0 ? minHlc : 0)).syncUninterruptibly();
}
public AccordJournal journal()
@@ -1006,7 +1039,7 @@
@Override
public Future<Void> epochReady(Epoch epoch)
{
- return asPromise(configService.epochReady(epoch.getEpoch()));
+ return toFuture(configService.epochReady(epoch.getEpoch()));
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index c6e754b..34453d2 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -28,6 +28,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import accord.utils.async.AsyncResult;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@
IVerbHandler<? extends Request> requestHandler();
IVerbHandler<? extends Reply> responseHandler();
- AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits);
+ AsyncResult<Void> sync(Object requestedBy, @Nullable Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits);
AsyncChain<Void> sync(@Nullable Timestamp minBound, Keys keys, SyncLocal syncLocal, SyncRemote syncRemote);
AsyncChain<Timestamp> maxConflict(Ranges ranges);
@@ -214,7 +215,7 @@
}
@Override
- public AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
+ public AsyncResult<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
{
throw new UnsupportedOperationException("No accord transaction should be executed when accord.enabled = false in cassandra.yaml");
}
@@ -410,7 +411,7 @@
}
@Override
- public AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
+ public AsyncResult<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
{
return delegate.sync(requestedBy, onOrAfter, ranges, include, syncLocal, syncRemote, timeout, timeoutUnits);
}
diff --git a/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java b/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
index e015a33..7626ee9 100644
--- a/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
@@ -33,11 +33,11 @@
public static final ImmediateAsyncExecutor INSTANCE = new ImmediateAsyncExecutor();
@Override
- public <T> AsyncChain<T> build(Callable<T> task)
+ public <T> AsyncChain<T> chain(Callable<T> call)
{
try
{
- return AsyncChains.success(task.call());
+ return AsyncChains.success(call.call());
}
catch (Throwable t)
{
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 9204770..be7153f 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -58,8 +58,7 @@
import accord.utils.UnhandledEnum;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
+import accord.utils.async.Cancellable;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.RequestTimeoutException;
import org.apache.cassandra.metrics.AccordCoordinatorMetrics;
@@ -392,9 +391,15 @@
return AsyncChains.success(staleId);
logger.debug("Waiting {} micros for {} to be stale", waitMicros, staleId);
- AsyncResult.Settable<TxnId> result = AsyncResults.settable();
- node.scheduler().once(() -> result.setSuccess(staleId), waitMicros, MICROSECONDS);
- return result;
+ return new AsyncChains.Head<>()
+ {
+ @Override
+ protected @Nullable Cancellable start(BiConsumer<? super TxnId, Throwable> callback)
+ {
+ node.scheduler().once(() -> callback.accept(staleId, null), waitMicros, MICROSECONDS);
+ return null;
+ }
+ };
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
index 72ac8d7..597294c 100644
--- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
+++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiConsumer;
+import accord.api.AsyncExecutor;
import accord.api.Data;
import accord.api.Result;
import accord.coordinate.CoordinationAdapter;
@@ -257,7 +258,7 @@
}
Group group = Group.one(command);
- results.add(AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+ results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
TxnData result = new TxnData();
// Enforcing limits is redundant since we only have a group of size 1, but checking anyways
// documents the requirement here
@@ -293,7 +294,7 @@
// TODO (required): To make migration work we need to validate that the range is all on Accord
- results.add(AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+ results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
TxnData result = new TxnData();
try (PartitionIterator iterator = StorageProxy.getRangeSlice(command, consistencyLevel, this, requestTime))
{
@@ -395,7 +396,7 @@
private AsyncChain<Data> executeUnrecoverableRepairUpdate()
{
- return AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+ return AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
UnrecoverableRepairUpdate repairUpdate = (UnrecoverableRepairUpdate)txn.update();
// TODO (expected): We should send the read in the same message as the commit. This requires refactor ReadData.Kind so that it doesn't specify the ordinal encoding
// and can be extended similar to MessageType which allows additional types not from Accord to be added
diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
index 4f94cf2..51f9360 100644
--- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
+++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
@@ -29,6 +29,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import accord.api.AsyncExecutor;
import accord.api.Data;
import accord.local.Node;
import accord.local.SafeCommandStore;
@@ -282,7 +283,7 @@
return AsyncChains.success(new LocalReadData(new ArrayList<>(), readCommand));
ReadCommand submit = readCommand.withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()), nowInSeconds);
- return AsyncChains.ofCallable(Stage.READ.executor(), () -> new LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), command));
+ return AsyncExecutor.chain(Stage.READ.executor(), () -> new LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), command));
}
// This path can have a subrange we have never seen before provided by short read protection or read repair so we need to
@@ -297,7 +298,7 @@
continue;
ReadCommand submit = TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) command, intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds);
TokenKey routingKey = ((TokenRange)r).start();
- chains.add(AsyncChains.ofCallable(Stage.READ.executor(), () -> new LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, false), command)));
+ chains.add(AsyncExecutor.chain(Stage.READ.executor(), () -> new LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, false), command)));
}
if (chains.isEmpty())
diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
index 1a9772f..d080386 100644
--- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
+++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import javax.annotation.Nullable;
+import accord.api.AsyncExecutor;
import accord.api.Data;
import accord.local.Node;
import accord.local.SafeCommandStore;
@@ -34,7 +35,6 @@
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadRepairVerbHandler;
import org.apache.cassandra.db.TypeSizes;
@@ -110,11 +110,8 @@
private static final IVersionedSerializer<Data> noop_data_serializer = new IVersionedSerializer<>()
{
- @Override
- public void serialize(Data t, DataOutputPlus out, Version version) throws IOException {}
- @Override
- public Data deserialize(DataInputPlus in, Version version) throws IOException { return Data.NOOP_DATA; }
-
+ @Override public void serialize(Data t, DataOutputPlus out, Version version) {}
+ @Override public Data deserialize(DataInputPlus in, Version version) { return Data.NOOP_DATA; }
public long serializedSize(Data t, Version version) { return 0; }
};
@@ -148,7 +145,7 @@
protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Participants<?> execute)
{
// TODO (required): subtract unavailable ranges, either from read or from response (or on coordinator)
- return AsyncChains.ofCallable(Verb.READ_REPAIR_REQ.stage.executor(), () -> {
+ return AsyncExecutor.chain(Verb.READ_REPAIR_REQ.stage.executor(), () -> {
ReadRepairVerbHandler.instance.applyMutation(mutation);
return Data.NOOP_DATA;
});
diff --git a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
index 4cddc7c..996c822 100644
--- a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
+++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
@@ -164,7 +164,7 @@
long timeoutNanos = getAccordRepairTimeoutNanos();
long maxHlc = AccordService.getBlocking(service.maxConflict(ranges).flatMap(conflict -> {
Timestamp conflictMax = mergeMax(conflict, minForEpoch(this.minEpoch.getEpoch()));
- return service.sync("[repairId #" + repairId + ']', conflictMax, Ranges.of(range), null, NoLocal, syncRemote, timeoutNanos, NANOSECONDS).map(ignored -> conflictMax.hlc());
+ return service.sync("[repairId #" + repairId + ']', conflictMax, Ranges.of(range), null, NoLocal, syncRemote, timeoutNanos, NANOSECONDS).map(ignored -> conflictMax.hlc()).chain();
}), ranges, bookkeeping, start, start + timeoutNanos);
waiting = null;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 6160232..a9060c5 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -35,7 +35,7 @@
import accord.primitives.Timestamp;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncResults;
+import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.PartitionRangeReadCommand;
@@ -231,7 +231,7 @@
{
ReadCommand command = deserialize(tables);
if (command == null)
- return AsyncResults.success(new TxnData());
+ return AsyncChains.success(new TxnData());
// It's fine for our nowInSeconds to lag slightly our insertion timestamp, as to the user
// this simply looks like the transaction witnessed TTL'd data and the data then expired
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index 83a287b..e8e70d7 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -27,11 +27,11 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.AsyncExecutor;
import accord.api.Write;
import accord.local.CommandStore;
import accord.local.SafeCommandStore;
@@ -138,13 +138,13 @@
'}';
}
- public AsyncChain<Void> write(Executor executor, TableMetadatas tables, boolean preserveTimestamps, long timestamp)
+ public AsyncChain<Void> write(AsyncExecutor executor, TableMetadatas tables, boolean preserveTimestamps, long timestamp)
{
PartitionUpdate update = deserialize(tables);
if (!preserveTimestamps)
update = new PartitionUpdate.Builder(update, 0).updateAllTimestamp(timestamp).build();
Mutation mutation = new Mutation(update, PotentialTxnConflicts.ALLOW);
- return AsyncChains.ofRunnable(executor, () -> mutation.apply(false, false));
+ return executor.chain(() -> mutation.apply(false, false));
}
@Override
@@ -479,9 +479,9 @@
return AsyncChains.success(null);
if (results.size() == 1)
- return results.get(0).map(o -> null);
+ return results.get(0).mapToNull();
- return AsyncChains.reduce(results, (i1, i2) -> null, (Void)null).map(ignore -> null);
+ return AsyncChains.reduce(results, (i1, i2) -> null, null);
}
public long estimatedSizeOnHeap()
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
index 7cf577d..dca31cc 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
@@ -34,6 +34,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
+import accord.utils.async.AsyncResult;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.ThrowableUtil;
import org.apache.cassandra.utils.concurrent.ListenerList.CallbackBiConsumerListener;
@@ -317,17 +318,6 @@
}
/**
- * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
- *
- * See {@link #addListener(GenericFutureListener)} for ordering semantics.
- */
- @Override
- public <T> Future<T> map(Function<? super V, ? extends T> mapper)
- {
- return map(mapper, null);
- }
-
- /**
* Support more fluid version of {@link com.google.common.util.concurrent.Futures#addCallback}
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
@@ -366,12 +356,12 @@
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
- protected <T> Future<T> flatMap(AbstractFuture<T> result, Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
+ protected <T> Future<T> flatMap(AbstractFuture<T> result, Function<? super V, ? extends AsyncResult<T>> flatMapper, @Nullable Executor executor)
{
addListener(() -> {
try
{
- if (isSuccess()) flatMapper.apply(getNow()).addListener(propagate(result));
+ if (isSuccess()) flatMapper.apply(getNow()).invoke(propagateAsConsumer(result));
else result.tryFailure(cause());
}
catch (Throwable t)
@@ -404,7 +394,7 @@
addListener(() -> {
try
{
- if (isSuccess()) andThen.apply(getNow()).addListener(propagate(result));
+ if (isSuccess()) andThen.apply(getNow()).addListener(propagateAsListener(result));
else result.tryFailure(cause());
}
catch (Throwable t)
@@ -553,11 +543,22 @@
/**
* @return a listener that will propagate to {@code to} the result of the future it is invoked with
*/
- private static <V> GenericFutureListener<? extends Future<V>> propagate(AbstractFuture<? super V> to)
+ private static <V> GenericFutureListener<? extends Future<V>> propagateAsListener(AbstractFuture<? super V> to)
{
return from -> {
if (from.isSuccess()) to.trySuccess(from.getNow());
else to.tryFailure(from.cause());
};
}
+
+ /**
+ * @return a listener that will propagate to {@code to} the result of the future it is invoked with
+ */
+ private static <V> BiConsumer<? super V, Throwable> propagateAsConsumer(AbstractFuture<? super V> to)
+ {
+ return (success, fail) -> {
+ if (fail == null) to.trySuccess(success);
+ else to.tryFailure(fail);
+ };
+ }
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
index 3e5052e..e381172 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
@@ -27,6 +27,7 @@
import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.cassandra.utils.concurrent.ListenerList.Waiting;
@@ -146,7 +147,7 @@
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
@Override
- public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
+ public <T> Future<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper, @Nullable Executor executor)
{
return flatMap(new AsyncFuture<>(), flatMapper, executor);
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index c5fc022..b8bf62e 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -27,6 +27,8 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
+import accord.api.AsyncExecutor;
+import accord.utils.async.AsyncResult;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.PlatformDependent;
@@ -41,7 +43,7 @@
* integrating also with our {@link Awaitable} abstraction, to overall improve coherency and clarity in the codebase.
*/
@Shared(scope = SIMULATION, ancestors = INTERFACES)
-public interface Future<V> extends io.netty.util.concurrent.Future<V>, ListenableFuture<V>, Awaitable
+public interface Future<V> extends io.netty.util.concurrent.Future<V>, ListenableFuture<V>, Awaitable, AsyncResult<V>
{
/**
* Wait indefinitely for this future to complete, throwing any interrupt
@@ -123,6 +125,8 @@
return awaitUninterruptibly(l, MILLISECONDS);
}
+ default Future<V> invoke(BiConsumer<? super V, Throwable> callback) { return addCallback(callback); }
+
/**
* Support {@link com.google.common.util.concurrent.Futures#addCallback} natively
*/
@@ -158,7 +162,12 @@
*/
default <T> Future<T> map(Function<? super V, ? extends T> mapper)
{
- return map(mapper, null);
+ return map(mapper, (Executor) null);
+ }
+
+ default <T> AsyncResult<T> map(Function<? super V, ? extends T> mapper, AsyncExecutor executor)
+ {
+ return map(mapper, (Executor) executor);
}
/**
@@ -169,15 +178,20 @@
/**
* Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
*/
- default <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper)
+ default <T> Future<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper)
{
- return flatMap(flatMapper, null);
+ return flatMap(flatMapper, (Executor) null);
+ }
+
+ default <T> AsyncResult<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper, AsyncExecutor executor)
+ {
+ return flatMap(flatMapper, (Executor) executor);
}
/**
* Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
*/
- <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, Executor executor);
+ <T> Future<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper, Executor executor);
/**
* Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
index 287911b..6098f8d 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
@@ -26,6 +26,7 @@
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
+import accord.utils.async.AsyncResult;
import io.netty.util.concurrent.GenericFutureListener;
import static org.apache.cassandra.utils.concurrent.Awaitable.SyncAwaitable.waitUntil;
@@ -106,7 +107,7 @@
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
@Override
- public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
+ public <T> Future<T> flatMap(Function<? super V, ? extends AsyncResult<T>> flatMapper, @Nullable Executor executor)
{
return flatMap(new SyncFuture<>(), flatMapper, executor);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index 696c351..d30c3d8 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -62,10 +62,10 @@
import org.apache.cassandra.utils.FBUtilities;
import org.assertj.core.api.Assertions;
-import static accord.utils.async.AsyncChains.awaitUninterruptiblyAndRethrow;
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
public class AccordBootstrapTest extends TestBaseImpl
{
@@ -278,7 +278,7 @@
Assert.assertTrue(session.getNumKeyspaceTransfers() > 0);
});
- awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
+ getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.bootstrapBeganAt().keySet()));
Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.safeToReadAt().keySet()));
@@ -321,7 +321,7 @@
Assert.assertEquals(key, row.getInt("c"));
Assert.assertEquals(key, row.getInt("v"));
- awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
+ getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
if (safeStore.ranges().currentRanges().contains(partitionKey))
{
AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
@@ -464,7 +464,7 @@
PartitionKey partitionKey = new PartitionKey(tableId, dk);
- awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
+ getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
partitionKey.toUnseekable(), moveMax, moveMax,
safeStore -> {
if (!safeStore.ranges().allAt(preMove).contains(partitionKey))
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java
index 945b8d4..a71e779 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java
@@ -30,7 +30,6 @@
import accord.primitives.Routable;
import accord.primitives.Txn;
import accord.primitives.TxnId;
-import accord.utils.async.AsyncChains;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
@@ -48,6 +47,7 @@
import static accord.local.LoadKeysFor.READ_WRITE;
import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
public class AccordDropTableBase extends TestBaseImpl
{
@@ -138,7 +138,7 @@
for (int storeId : stores.ids())
{
AccordCommandStore store = (AccordCommandStore) stores.forId(storeId);
- AsyncChains.getUnchecked(store.submit(ctx, input -> {
+ getBlocking(store.chain(ctx, input -> {
AccordSafeCommandStore safe = (AccordSafeCommandStore) input;
for (RoutingKey key : safe.commandsForKeysKeys())
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
index 7f9fef5..a5a38ed 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
@@ -35,7 +35,7 @@
import accord.primitives.Status;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
-import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncResult;
import com.google.common.collect.Iterables;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -72,6 +72,7 @@
import static accord.local.LoadKeysFor.READ_WRITE;
import static java.lang.String.format;
import static org.apache.cassandra.distributed.test.accord.AccordTestBase.executeWithRetry;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
public class AccordIncrementalRepairTest extends TestBaseImpl
{
@@ -92,16 +93,16 @@
return delegate.sync(requestedBy, onOrAfter, ranges, include, syncLocal, syncRemote, 10L, TimeUnit.MINUTES).map(v -> {
executedBarriers = true;
return v;
- }).beginAsResult();
+ });
}
@Override
- public AsyncResult<Void> sync(@Nullable Timestamp onOrAfter, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote)
+ public AsyncChain<Void> sync(@Nullable Timestamp onOrAfter, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote)
{
return delegate.sync(onOrAfter, keys, syncLocal, syncRemote).map(v -> {
executedBarriers = true;
return v;
- }).beginAsResult();
+ });
}
public void reset()
@@ -206,7 +207,7 @@
{
Node node = accordService().node();
AtomicReference<TxnId> waitFor = new AtomicReference<>(null);
- AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+ getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore;
SafeCommandsForKey safeCfk = store.ifLoadedAndInitialised(key);
if (safeCfk == null)
@@ -228,7 +229,7 @@
long now = Clock.Global.currentTimeMillis();
if (now - start > TimeUnit.MINUTES.toMillis(1))
throw new AssertionError("Timeout");
- AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+ AccordService.getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
SafeCommand command = safeStore.get(txnId, StoreParticipants.empty(txnId));
Assert.assertNotNull(command.current());
if (command.current().status().hasBeen(Status.Applied))
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
index ba7eff1..c9ec047 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
@@ -30,7 +30,6 @@
import accord.local.durability.DurabilityService;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
-import accord.utils.async.AsyncChains;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
@@ -53,6 +52,7 @@
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
import static org.apache.cassandra.service.accord.AccordKeyspace.JOURNAL;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
/**
* This is a specific history of {@link StatefulJournalRestartTest}, which offers a more general way of testing this
@@ -90,7 +90,7 @@
Ranges ranges = Ranges.single(TokenRange.fullRange(metadata.id, metadata.partitioner));
for (int i = 0; i < 10; i++)
{
- AsyncChains.getBlockingAndRethrow(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
+ getBlocking(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
accord.journal().closeCurrentSegmentForTestingIfNonEmpty();
accord.journal().runCompactorForTesting();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java
index 76eeda8..acdd4da 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java
@@ -32,7 +32,6 @@
import accord.primitives.Timestamp;
import accord.utils.Property;
import accord.utils.Property.SimpleCommand;
-import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
@@ -47,6 +46,7 @@
import static accord.utils.Property.stateful;
import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
import static org.apache.cassandra.service.accord.AccordKeyspace.JOURNAL;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
/**
* There are 2 errors blocking this test from being run
@@ -119,7 +119,7 @@
Ranges ranges = Ranges.single(TokenRange.fullRange(metadata.id, metadata.partitioner));
for (int i = 0; i < 10; i++)
{
- AsyncChains.getBlockingAndRethrow(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
+ getBlocking(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
accord.journal().closeCurrentSegmentForTestingIfNonEmpty();
accord.journal().runCompactorForTesting();
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index bc83665..39b693d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -94,12 +94,12 @@
import static accord.primitives.Routable.Domain.Range;
import static accord.primitives.Timestamp.Flag.HLC_BOUND;
import static accord.primitives.Timestamp.Flag.SHARD_BOUND;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
import static org.apache.cassandra.Util.spinAssertEquals;
import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
import static org.apache.cassandra.service.accord.AccordKeyspace.COMMANDS_FOR_KEY;
import static org.apache.cassandra.service.accord.AccordKeyspace.CFKAccessor;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -314,32 +314,32 @@
PartialDeps partialDeps = Deps.NONE.intersecting(AccordTestUtils.fullRange(txn));
PartialTxn partialTxn = txn.slice(commandStore.unsafeGetRangesForEpoch().currentRanges(), true);
Route<?> partialRoute = route.slice(commandStore.unsafeGetRangesForEpoch().currentRanges());
- getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+ getBlocking(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
CheckedCommands.preaccept(safe, txnId, partialTxn, route, (a, b) -> {});
- }).beginAsResult());
+ }));
flush(commandStore);
- getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+ getBlocking(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, txnId, partialDeps, (a, b) -> {});
- }).beginAsResult());
+ }));
flush(commandStore);
- getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+ getBlocking(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, txnId, partialDeps, (a, b) -> {});
- }).beginAsResult());
+ }));
flush(commandStore);
- getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+ getBlocking(commandStore.chain(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
return AccordTestUtils.processTxnResultDirect(safe, txnId, partialTxn, txnId);
- }).flatMap(i -> i).flatMap(result -> commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+ }).flatMap(i -> i).flatMap(result -> commandStore.chain(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
CheckedCommands.apply(safe, txnId, route, txnId, partialDeps, partialTxn, result.left, result.right, (a, b) -> {});
})));
flush(commandStore);
// The apply chain is asychronous, so it is easiest to just spin until it is applied
// in order to have the updated state in the system table
spinAssertEquals(true, 5, () -> {
- return getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+ return getBlocking(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
StoreParticipants participants = StoreParticipants.all(route);
Command command = safe.get(txnId, participants).current();
return command.hasBeen(Status.Applied);
- }).beginAsResult());
+ }));
});
flush(commandStore);
}
diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 4e63ce3..7f7e44f 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.virtual;
+import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -41,9 +42,10 @@
import accord.primitives.Status.Durability.HasOutcome;
import accord.primitives.Txn;
import accord.primitives.TxnId;
-import accord.utils.async.AsyncChains;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.OptionaldPositiveInt;
+import org.apache.cassandra.config.YamlConfigurationLoader;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
@@ -54,6 +56,7 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.TokenRange;
import org.apache.cassandra.service.accord.api.TokenKey;
@@ -66,6 +69,10 @@
import static accord.api.ProtocolModifiers.Toggles.SendStableMessages.TO_ALL;
import static accord.primitives.TxnId.FastPath.Unoptimised;
import static org.apache.cassandra.Util.spinUntilSuccess;
+import static org.apache.cassandra.net.Verb.ACCORD_APPLY_AND_WAIT_REQ;
+import static org.apache.cassandra.net.Verb.ACCORD_APPLY_REQ;
+import static org.apache.cassandra.net.Verb.ACCORD_BEGIN_RECOVER_REQ;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
public class AccordDebugKeyspaceTest extends CQLTester
@@ -120,15 +127,21 @@
@BeforeClass
public static void setUpClass()
{
+ Config.setOverrideLoadConfig(() -> {
+ Config config = new YamlConfigurationLoader().loadConfig();
+ config.accord.queue_shard_count = new OptionaldPositiveInt(1);
+ config.concurrent_accord_operations = 1;
+ config.accord.command_store_shard_count = new OptionaldPositiveInt(1);
+ config.accord.enable_virtual_debug_only_keyspace = true;
+ return config;
+ });
daemonInitialization();
- DatabaseDescriptor.getAccord().queue_shard_count = new OptionaldPositiveInt(1);
- DatabaseDescriptor.getAccord().command_store_shard_count = new OptionaldPositiveInt(1);
ProtocolModifiers.Toggles.setSendStableMessages(TO_ALL);
CQLTester.setUpClass();
+ CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces();
AccordService.startup(ClusterMetadata.current().myNodeId());
- VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance);
requireNetwork();
}
@@ -169,7 +182,8 @@
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1));
- accord.node().coordinate(id, txn);
+ accord.node().coordinate(id, txn).beginAsResult();
+ filter.appliesTo(id);
filter.preAccept.awaitThrowUncheckedOnInterrupt();
filter.apply.awaitThrowUncheckedOnInterrupt();
@@ -198,7 +212,7 @@
TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range, accord.nodeId());
Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(1)), new TokenKey(tableId, new LongToken(100))));
Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(100)), new TokenKey(tableId, new LongToken(200))));
- AsyncChains.getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> "Test", safeStore -> {
+ getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> "Test", safeStore -> {
safeStore.commandStore().markShardDurable(safeStore, syncId1, ranges1, HasOutcome.Universal);
safeStore.commandStore().markShardDurable(safeStore, syncId2, ranges2, HasOutcome.Quorum);
}));
@@ -218,11 +232,11 @@
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
String keyStr = txn.keys().get(0).toUnseekable().toString();
- AsyncChains.getBlocking(accord.node().coordinate(id, txn));
+ getBlocking(accord.node().coordinate(id, txn));
spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
row(id.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), SaveStatus.Applied.name()))));
- assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), "Applied"));
+ spinUntilSuccess(() -> assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), "Applied")));
assertRows(execute(QUERY_JOURNAL, id.toString()), row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"), row(id.toString(), null));
assertRows(execute(QUERY_COMMANDS_FOR_KEY, keyStr), row(id.toString(), "APPLIED_DURABLE"));
}
@@ -245,7 +259,8 @@
" END IF\n" +
"COMMIT TRANSACTION", KEYSPACE, tableName, KEYSPACE, tableName);
Txn txn = createTxn(insertTxn, 0, 0, 0, 0, 0);
- accord.node().coordinate(id, txn);
+ filter.appliesTo(id);
+ accord.node().coordinate(id, txn).beginAsResult();
filter.preAccept.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
@@ -280,20 +295,22 @@
" INSERT INTO %s.%s (k, c, v) VALUES (?, ?, ?);\n" +
" END IF\n" +
"COMMIT TRANSACTION", KEYSPACE, tableName, KEYSPACE, tableName);
- accord.node().coordinate(first, createTxn(insertTxn, 0, 0, 0, 0, 0));
+
+ filter.appliesTo(first);
+ accord.node().coordinate(first, createTxn(insertTxn, 0, 0, 0, 0, 0)).beginAsResult();
filter.preAccept.awaitThrowUncheckedOnInterrupt();
- assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
- row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ spinUntilSuccess(() ->assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
+ row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))));
filter.apply.awaitThrowUncheckedOnInterrupt();
- assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
- row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.ReadyToExecute.name()));
-
- filter.reset();
+ spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
+ row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.ReadyToExecute.name())));
TxnId second = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
- accord.node().coordinate(second, createTxn(insertTxn, 0, 0, 0, 0, 0));
+ filter.reset();
+ filter.appliesTo(second);
+ accord.node().coordinate(second, createTxn(insertTxn, 0, 0, 0, 0, 0)).beginAsResult();
filter.commit.awaitThrowUncheckedOnInterrupt();
@@ -357,7 +374,8 @@
tableName);
AccordService accord = accord();
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
- accord.node().coordinate(id, createTxn(insertTxn, 0, 0, 0));
+ filter.appliesTo(id);
+ accord.node().coordinate(id, createTxn(insertTxn, 0, 0, 0)).beginAsResult();
filter.preAccept.awaitThrowUncheckedOnInterrupt();
String QUERY_JOURNAL = String.format("SELECT txn_id, save_status, command_store_id FROM %s.%s WHERE txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.JOURNAL);
@@ -402,6 +420,15 @@
}
ConcurrentMap<TxnId, ConcurrentSkipListSet<Verb>> txnToVerbs = new ConcurrentHashMap<>();
+ Set<Verb> dropVerbs = Set.of(ACCORD_APPLY_REQ, ACCORD_APPLY_AND_WAIT_REQ, ACCORD_BEGIN_RECOVER_REQ);
+ Set<TxnId> applyTo;
+
+ void appliesTo(TxnId txnId)
+ {
+ if (applyTo == null)
+ applyTo = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ applyTo.add(txnId);
+ }
@Override
public boolean test(Message<?> msg, InetAddressAndPort to)
@@ -412,6 +439,8 @@
if (msg.payload instanceof TxnRequest)
{
txnId = ((TxnRequest<?>) msg.payload).txnId;
+ if (applyTo != null && !applyTo.contains(txnId))
+ return true;
}
Set<Verb> seen;
if (txnId != null)
@@ -424,32 +453,15 @@
case ACCORD_APPLY_REQ:
case ACCORD_APPLY_AND_WAIT_REQ:
apply.signalAll();
- case ACCORD_BEGIN_RECOVER_REQ:
- return false;
+ break;
case ACCORD_PRE_ACCEPT_RSP:
preAccept.signalAll();
- return true;
+ break;
case ACCORD_COMMIT_REQ:
case ACCORD_STABLE_THEN_READ_REQ:
commit.signalAll();
- return true;
- case ACCORD_PRE_ACCEPT_REQ:
- case ACCORD_ACCEPT_REQ:
- case ACCORD_ACCEPT_RSP:
- case ACCORD_CHECK_STATUS_REQ:
- case ACCORD_CHECK_STATUS_RSP:
- case ACCORD_READ_REQ:
- case ACCORD_READ_RSP:
- case ACCORD_AWAIT_REQ:
- case ACCORD_AWAIT_RSP:
- case ACCORD_AWAIT_ASYNC_RSP_REQ:
- return true;
- default:
- // many code paths don't log the error...
- UnsupportedOperationException e = new UnsupportedOperationException(msg.verb().name());
- logger.error("Unexpected verb {}", msg.verb(), e);
- throw e;
}
+ return !dropVerbs.contains(msg.verb());
}
}
}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordAsyncConversionTest.java b/test/unit/org/apache/cassandra/service/accord/AccordAsyncConversionTest.java
new file mode 100644
index 0000000..09cc6e8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/AccordAsyncConversionTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+public class AccordAsyncConversionTest
+{
+ private static final String SUCCESS = "test-success";
+ private static final RuntimeException EXCEPTION = new RuntimeException("test-failure");
+
+ @Test
+ public void testAsyncChainToFutureSuccess()
+ {
+ AsyncChain<String> chain = AsyncChains.success(SUCCESS);
+ Future<String> future = AccordService.toFuture(chain);
+
+ Assert.assertTrue("should be completed", future.isDone());
+ Assert.assertTrue("should be successful", future.isSuccess());
+ Assert.assertEquals("should contain expected value", SUCCESS, future.getNow());
+ }
+
+ @Test
+ public void testAsyncChainToFutureFailure()
+ {
+ AsyncChain<String> chain = AsyncChains.failure(EXCEPTION);
+ Future<String> future = AccordService.toFuture(chain);
+
+ Assert.assertTrue("should be completed", future.isDone());
+ Assert.assertFalse("should be failed", future.isSuccess());
+ Assert.assertEquals("should contain expected exception", EXCEPTION, future.cause());
+ }
+
+ @Test
+ public void testAsyncChainToFutureAsyncCompletion() throws InterruptedException
+ {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ AsyncChain<String> chain = new AsyncChains.Head<String>()
+ {
+ @Override
+ protected accord.utils.async.Cancellable start(BiConsumer<? super String, Throwable> callback)
+ {
+ Thread t = new Thread(() -> {
+ try
+ {
+ startLatch.await();
+ callback.accept(SUCCESS, null);
+ }
+ catch (InterruptedException e)
+ {
+ callback.accept(null, e);
+ }
+ });
+ t.start();
+ return () -> t.interrupt();
+ }
+ };
+
+ Future<String> future = AccordService.toFuture(chain);
+
+ Assert.assertFalse("should not be completed initially", future.isDone());
+
+ startLatch.countDown();
+
+ Assert.assertEquals("should complete with expected value", SUCCESS, future.syncUninterruptibly().getNow());
+ }
+
+ @Test
+ public void testAsyncResultToFutureWhenAlreadyFuture()
+ {
+ AsyncFuture<String> original = ImmediateFuture.success(SUCCESS);
+ Future<String> converted = AccordService.toFuture(original);
+
+ Assert.assertSame("Should return the same instance when already a Future", original, converted);
+ }
+
+ @Test
+ public void testAsyncResultToFutureSuccess()
+ {
+ AsyncResult<String> result = new TestAsyncResult<>(SUCCESS, null);
+ Future<String> future = AccordService.toFuture(result);
+
+ Assert.assertTrue("should be completed", future.isDone());
+ Assert.assertTrue("should be successful", future.isSuccess());
+ Assert.assertEquals("should contain expected value", SUCCESS, future.getNow());
+ }
+
+ @Test
+ public void testAsyncResultToFutureFailure()
+ {
+ AsyncResult<String> result = new TestAsyncResult<>(null, EXCEPTION);
+ Future<String> future = AccordService.toFuture(result);
+
+ Assert.assertTrue("should be completed", future.isDone());
+ Assert.assertFalse("should be failed", future.isSuccess());
+ Assert.assertEquals("should contain expected exception", EXCEPTION, future.cause());
+ }
+
+ @Test
+ public void testGetBlockingAsyncChainSuccess()
+ {
+ AsyncChain<String> chain = AsyncChains.success(SUCCESS);
+ String result = AccordService.getBlocking(chain);
+
+ Assert.assertEquals("Should return expected value", SUCCESS, result);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testGetBlockingAsyncChainFailure()
+ {
+ AsyncChain<String> chain = AsyncChains.failure(EXCEPTION);
+ AccordService.getBlocking(chain);
+ }
+
+ @Test
+ public void testGetBlockingAsyncResultSuccess()
+ {
+ AsyncResult<String> result = new TestAsyncResult<>(SUCCESS, null);
+ String value = AccordService.getBlocking(result);
+
+ Assert.assertEquals("Should return expected value", SUCCESS, value);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testGetBlockingAsyncResultFailure()
+ {
+ AsyncResult<String> result = new TestAsyncResult<>(null, EXCEPTION);
+ AccordService.getBlocking(result);
+ }
+
+ @Test
+ public void testAsyncChainCancellation() throws InterruptedException
+ {
+ AtomicBoolean cancelled = new AtomicBoolean(false);
+ CountDownLatch startLatch = new CountDownLatch(1);
+
+ AsyncChain<String> chain = new AsyncChains.Head<String>()
+ {
+ @Override
+ protected accord.utils.async.Cancellable start(BiConsumer<? super String, Throwable> callback)
+ {
+ Thread t = new Thread(() -> {
+ try
+ {
+ startLatch.await();
+ Thread.sleep(1000);
+ callback.accept(SUCCESS, null);
+ }
+ catch (InterruptedException e)
+ {
+ cancelled.set(true);
+ callback.accept(null, e);
+ }
+ });
+ t.start();
+ return () -> {
+ cancelled.set(true);
+ t.interrupt();
+ };
+ }
+ };
+
+ Future<String> future = AccordService.toFuture(chain);
+ startLatch.countDown();
+
+ boolean cancelResult = future.cancel(true);
+ Thread.sleep(100);
+
+ Assert.assertTrue("should be cancellable", cancelResult);
+ Assert.assertTrue("should be cancelled", future.isCancelled());
+ }
+
+ @Test
+ public void testMultipleCallbacksOnAsyncResult() throws InterruptedException
+ {
+ AtomicReference<String> callback1Result = new AtomicReference<>();
+ AtomicReference<String> callback2Result = new AtomicReference<>();
+ CountDownLatch callbackLatch = new CountDownLatch(2);
+
+ AsyncResult<String> result = new TestAsyncResult<>(SUCCESS, null);
+
+ result.invoke((value, throwable) -> {
+ callback1Result.set(value);
+ callbackLatch.countDown();
+ });
+
+ result.invoke((value, throwable) -> {
+ callback2Result.set(value);
+ callbackLatch.countDown();
+ });
+
+ Assert.assertTrue("Both callbacks should be invoked", callbackLatch.await(1, TimeUnit.SECONDS));
+ Assert.assertEquals("First callback should receive correct value", SUCCESS, callback1Result.get());
+ Assert.assertEquals("Second callback should receive correct value", SUCCESS, callback2Result.get());
+ }
+
+ @Test
+ public void testNullAsyncResult()
+ {
+ try
+ {
+ AccordService.toFuture((AsyncResult<String>) null);
+ Assert.fail("Should throw NullPointerException for null AsyncResult");
+ }
+ catch (NullPointerException e)
+ {
+ }
+ }
+
+ @Test
+ public void testNullAsyncChain()
+ {
+ try
+ {
+ AccordService.toFuture((AsyncChain<String>) null);
+ Assert.fail("Should throw NullPointerException for null AsyncChain");
+ }
+ catch (NullPointerException e)
+ {
+ // ignore
+ }
+ }
+
+ @Test
+ public void testFutureInterfaceCompliance()
+ {
+ AsyncChain<String> chain = AsyncChains.success(SUCCESS);
+ Future<String> future = AccordService.toFuture(chain);
+
+ Assert.assertTrue("should implement AsyncResult", future instanceof AsyncResult);
+ Assert.assertTrue("should be completion-aware", future.isDone());
+ Assert.assertNotNull("should have a result", future.getNow());
+ }
+
+ @Test
+ public void testConcurrentConversion() throws InterruptedException
+ {
+ int threadCount = 10;
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completeLatch = new CountDownLatch(threadCount);
+ AtomicReference<Exception> exception = new AtomicReference<>();
+
+ for (int i = 0; i < threadCount; i++)
+ {
+ final int threadIndex = i;
+ Thread t = new Thread(() -> {
+ try
+ {
+ startLatch.await();
+ AsyncChain<Integer> chain = AsyncChains.success(threadIndex);
+ Future<Integer> future = AccordService.toFuture(chain);
+ Assert.assertEquals("Thread " + threadIndex + " should get correct result",
+ Integer.valueOf(threadIndex), future.getNow());
+ }
+ catch (Exception e)
+ {
+ exception.set(e);
+ }
+ finally
+ {
+ completeLatch.countDown();
+ }
+ });
+ t.start();
+ }
+
+ startLatch.countDown();
+ Assert.assertTrue("All threads should complete", completeLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertNull("No exceptions should occur during concurrent conversion", exception.get());
+ }
+
+ private static class TestAsyncResult<V> implements AsyncResult<V>
+ {
+ private final V successValue;
+ private final Throwable failureValue;
+
+ public TestAsyncResult(V successValue, Throwable failureValue)
+ {
+ this.successValue = successValue;
+ this.failureValue = failureValue;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isSuccess()
+ {
+ return false;
+ }
+
+ @Override
+ public AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback)
+ {
+ callback.accept(successValue, failureValue);
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 3fe68ae..49684ac 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -50,7 +50,6 @@
import accord.primitives.Writes;
import accord.utils.ImmutableBitSet;
import accord.utils.LargeBitSet;
-import accord.utils.async.AsyncChains;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -73,6 +72,7 @@
import static accord.primitives.Status.Durability.AllQuorums;
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
import static org.apache.cassandra.service.accord.AccordTestUtils.Commands.preaccepted;
import static org.apache.cassandra.service.accord.AccordTestUtils.ballot;
import static org.apache.cassandra.service.accord.AccordTestUtils.createAccordCommandStore;
@@ -133,7 +133,7 @@
LargeBitSet waitingOnApply = new LargeBitSet(3);
waitingOnApply.set(1);
Command.WaitingOn waitingOn = new Command.WaitingOn(dependencies.keyDeps.keys(), dependencies.rangeDeps, new ImmutableBitSet(waitingOnApply), new ImmutableBitSet(2));
- Pair<Writes, Result> result = AsyncChains.getBlocking(AccordTestUtils.processTxnResult(commandStore, txnId, txn, executeAt));
+ Pair<Writes, Result> result = getBlocking(AccordTestUtils.processTxnResult(commandStore, txnId, txn, executeAt));
Command expected = Command.Executed.executed(txnId, SaveStatus.Applied, AllQuorums, StoreParticipants.all(route),
promised, executeAt, txn, dependencies, accepted,
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 7a612c1..b714b83 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -59,8 +59,8 @@
import static accord.api.ProtocolModifiers.Toggles.filterDuplicateDependenciesFromAcceptReply;
import static accord.local.LoadKeysFor.READ_WRITE;
import static accord.messages.Accept.Kind.SLOW;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
import static org.apache.cassandra.service.accord.AccordTestUtils.createAccordCommandStore;
import static org.apache.cassandra.service.accord.AccordTestUtils.createWriteTxn;
import static org.apache.cassandra.service.accord.AccordTestUtils.fullRange;
@@ -97,7 +97,7 @@
public void basicCycleTest() throws Throwable
{
AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- getUninterruptibly(commandStore.execute((PreLoadContext.Empty)() -> "Test", unused -> commandStore.executor().cacheUnsafe().setCapacity(0)));
+ getBlocking(commandStore.execute((PreLoadContext.Empty)() -> "Test", unused -> commandStore.executor().cacheUnsafe().setCapacity(0)));
TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
Txn txn = createWriteTxn(1);
@@ -109,7 +109,7 @@
PreAccept preAccept = PreAccept.SerializerSupport.create(txnId, route, 1, 1, 1, partialTxn, null, false, fullRoute);
// Check preaccept
- getUninterruptibly(commandStore.execute(preAccept, safeStore -> {
+ getBlocking(commandStore.execute(preAccept, safeStore -> {
SafeCommand safeCommand = safeStore.get(txnId, StoreParticipants.all(route));
Command before = safeCommand.current();
PreAccept.PreAcceptReply reply = preAccept.apply(safeStore);
@@ -123,7 +123,7 @@
AccordTestUtils.appendCommandsBlocking(commandStore, before, after);
}));
- getUninterruptibly(commandStore.execute(preAccept, safeStore -> {
+ getBlocking(commandStore.execute(preAccept, safeStore -> {
Command before = safeStore.ifInitialised(txnId).current();
SafeCommand safeCommand = safeStore.get(txnId, StoreParticipants.all(route));
Assert.assertEquals(txnId, before.executeAt());
@@ -147,7 +147,7 @@
}
Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, SLOW, Ballot.ZERO, executeAt, deps, false);
- getUninterruptibly(commandStore.execute(accept, safeStore -> {
+ getBlocking(commandStore.execute(accept, safeStore -> {
Command before = safeStore.ifInitialised(txnId).current();
Accept.AcceptReply reply = accept.apply(safeStore);
Assert.assertTrue(reply.isOk());
@@ -156,7 +156,7 @@
AccordTestUtils.appendCommandsBlocking(commandStore, before, after);
}));
- getUninterruptibly(commandStore.execute(accept, safeStore -> {
+ getBlocking(commandStore.execute(accept, safeStore -> {
Command before = safeStore.ifInitialised(txnId).current();
Assert.assertEquals(executeAt, before.executeAt());
Assert.assertEquals(Status.AcceptedSlow, before.status());
@@ -170,9 +170,9 @@
// check commit
Commit commit = Commit.SerializerSupport.create(txnId, route, 1, 1, Commit.Kind.StableWithTxnAndDeps, Ballot.ZERO, executeAt, partialTxn, deps, fullRoute);
- getUninterruptibly(commandStore.execute(commit, commit::apply));
+ getBlocking(commandStore.execute(commit, commit::apply));
- getUninterruptibly(commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key).toParticipants(), LoadKeys.SYNC, READ_WRITE, "Test"), safeStore -> {
+ getBlocking(commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key).toParticipants(), LoadKeys.SYNC, READ_WRITE, "Test"), safeStore -> {
Command before = safeStore.ifInitialised(txnId).current();
Assert.assertEquals(commit.executeAt, before.executeAt());
Assert.assertTrue(before.hasBeen(Status.Committed));
@@ -189,7 +189,7 @@
public void computeDeps() throws Throwable
{
AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- getUninterruptibly(commandStore.execute((PreLoadContext.Empty)()->"Test", unused -> commandStore.executor().cacheUnsafe().setCapacity(0)));
+ getBlocking(commandStore.execute((PreLoadContext.Empty)()->"Test", unused -> commandStore.executor().cacheUnsafe().setCapacity(0)));
TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1);
Txn txn = createWriteTxn(2);
@@ -200,7 +200,7 @@
PartialTxn partialTxn = txn.intersecting(route, true);
PreAccept preAccept1 = PreAccept.SerializerSupport.create(txnId1, route, 1, 1, 1, partialTxn, null, false, fullRoute);
- getUninterruptibly(commandStore.execute(preAccept1, safeStore -> {
+ getBlocking(commandStore.execute(preAccept1, safeStore -> {
persistDiff(commandStore, safeStore, txnId1, route, () -> {
preAccept1.apply(safeStore);
});
@@ -209,7 +209,7 @@
// second preaccept should identify txnId1 as a dependency
TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
PreAccept preAccept2 = PreAccept.SerializerSupport.create(txnId2, route, 1, 1, 1, partialTxn, null, false, fullRoute);
- getUninterruptibly(commandStore.execute(preAccept2, safeStore -> {
+ getBlocking(commandStore.execute(preAccept2, safeStore -> {
persistDiff(commandStore, safeStore, txnId2, route, () -> {
PreAccept.PreAcceptReply reply = preAccept2.apply(safeStore);
Assert.assertTrue(reply.isOk());
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
index 0dbc166..520e96f 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
@@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -90,8 +89,8 @@
import static accord.local.LoadKeysFor.READ_WRITE;
import static accord.local.PreLoadContext.contextFor;
import static accord.utils.Property.qt;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
import static org.apache.cassandra.service.accord.AccordTestUtils.createAccordCommandStore;
import static org.apache.cassandra.service.accord.AccordTestUtils.createPartialTxn;
import static org.apache.cassandra.service.accord.AccordTestUtils.keys;
@@ -128,7 +127,7 @@
AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
- getUninterruptibly(commandStore.execute(PreLoadContext.contextFor(txnId, "Test"), instance -> {
+ getBlocking(commandStore.execute(PreLoadContext.contextFor(txnId, "Test"), instance -> {
// TODO review: This change to `ifInitialized` was done in a lot of places and it doesn't preserve this property
// I fixed this reference to point to `ifLoadedAndInitialised` and but didn't update other places
Assert.assertNull(instance.ifInitialised(txnId));
@@ -142,7 +141,7 @@
AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
- getUninterruptibly(commandStore.execute(PreLoadContext.contextFor(txnId, "Test"), safe -> {
+ getBlocking(commandStore.execute(PreLoadContext.contextFor(txnId, "Test"), safe -> {
StoreParticipants participants = StoreParticipants.empty(txnId);
SafeCommand command = safe.get(txnId, participants);
Assert.assertNotNull(command);
@@ -156,7 +155,7 @@
Txn txn = AccordTestUtils.createWriteTxn((int)clock.incrementAndGet());
TokenKey key = ((PartitionKey) Iterables.getOnlyElement(txn.keys())).toUnseekable();
- getUninterruptibly(commandStore.execute((PreLoadContext.Empty)() -> "Test", instance -> {
+ getBlocking(commandStore.execute((PreLoadContext.Empty)() -> "Test", instance -> {
SafeCommandsForKey cfk = instance.ifLoadedAndInitialised(key);
Assert.assertNull(cfk);
}));
@@ -199,31 +198,24 @@
route.slice(ranges);
PartialDeps deps = PartialDeps.builder(ranges, true).build();
- try
+ Command command = getBlocking(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+ CheckedCommands.preaccept(safe, txnId, partialTxn, route, appendDiffToLog(commandStore));
+ CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
+ return safe.ifInitialised(txnId).current();
+ }));
+
+ // clear cache
+ try (ExclusiveGlobalCaches cache = commandStore.executor().lockCaches();)
{
- Command command = getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
- CheckedCommands.preaccept(safe, txnId, partialTxn, route, appendDiffToLog(commandStore));
- CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
- return safe.ifInitialised(txnId).current();
- }).beginAsResult());
-
- // clear cache
- try (ExclusiveGlobalCaches cache = commandStore.executor().lockCaches();)
- {
- long cacheSize = cache.global.capacity();
- cache.global.setCapacity(0);
- cache.global.setCapacity(cacheSize);
- }
-
- while (commandStore.executor().hasTasks())
- LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
-
- return command;
+ long cacheSize = cache.global.capacity();
+ cache.global.setCapacity(0);
+ cache.global.setCapacity(cacheSize);
}
- catch (ExecutionException e)
- {
- throw new AssertionError(e);
- }
+
+ while (commandStore.executor().hasTasks())
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
+
+ return command;
}
private static Command createStableUsingSlowLifeCycle(AccordCommandStore commandStore, TxnId txnId)
@@ -249,33 +241,26 @@
Route<?> partialRoute = route.slice(ranges);
PartialDeps deps = PartialDeps.builder(ranges, true).build();
- try
+ Command command = getBlocking(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
+ CheckedCommands.preaccept(safe, txnId, partialTxn, route, appendDiffToLog(commandStore));
+ CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, executeAt, deps, appendDiffToLog(commandStore));
+ CheckedCommands.commit(safe, SaveStatus.Committed, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
+ CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
+ return safe.ifInitialised(txnId).current();
+ }));
+
+ // clear cache
+ try (ExclusiveGlobalCaches cache = commandStore.executor().lockCaches();)
{
- Command command = getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
- CheckedCommands.preaccept(safe, txnId, partialTxn, route, appendDiffToLog(commandStore));
- CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, executeAt, deps, appendDiffToLog(commandStore));
- CheckedCommands.commit(safe, SaveStatus.Committed, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
- CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, executeAt, deps, appendDiffToLog(commandStore));
- return safe.ifInitialised(txnId).current();
- }).beginAsResult());
-
- // clear cache
- try (ExclusiveGlobalCaches cache = commandStore.executor().lockCaches();)
- {
- long cacheSize = cache.global.capacity();
- cache.global.setCapacity(0);
- cache.global.setCapacity(cacheSize);
- }
-
- while (commandStore.executor().hasTasks())
- LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
-
- return command;
+ long cacheSize = cache.global.capacity();
+ cache.global.setCapacity(0);
+ cache.global.setCapacity(cacheSize);
}
- catch (ExecutionException e)
- {
- throw new AssertionError(e);
- }
+
+ while (commandStore.executor().hasTasks())
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
+
+ return command;
}
@Test
@@ -318,7 +303,7 @@
});
}
AccordTask<Void> o1 = AccordTask.create(commandStore, ctx, consumer);
- AssertionUtils.assertThatThrownBy(() -> getUninterruptibly(o1.chain()))
+ AssertionUtils.assertThatThrownBy(() -> getBlocking(o1.chain()))
.hasRootCause()
.isInstanceOf(NullPointerException.class)
.hasNoSuppressedExceptions();
@@ -343,7 +328,7 @@
store.ifInitialised(id).readyToExecute(store);
});
});
- getUninterruptibly(o2.chain());
+ getBlocking(o2.chain());
awaitDone(commandStore, ids, participants);
assertNoReferences(commandStore, ids, participants);
@@ -376,7 +361,7 @@
AccordTask<Void> operation = AccordTask.create(commandStore, ctx, consumer);
- AssertionUtils.assertThatThrownBy(() -> getUninterruptibly(operation.chain()))
+ AssertionUtils.assertThatThrownBy(() -> getBlocking(operation.chain()))
.hasRootCause()
.isInstanceOf(NullPointerException.class)
.hasMessage(errorMsg)
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index e96cf8a..72f84c9 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -116,9 +116,9 @@
import static accord.primitives.SaveStatus.PreAccepted;
import static accord.primitives.Status.Durability.NotDurable;
import static accord.primitives.Txn.Kind.Write;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
import static java.lang.String.format;
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
public class AccordTestUtils
{
@@ -238,8 +238,8 @@
public static AsyncChain<Pair<Writes, Result>> processTxnResult(AccordCommandStore commandStore, TxnId txnId, PartialTxn txn, Timestamp executeAt) throws Throwable
{
AtomicReference<AsyncChain<Pair<Writes, Result>>> result = new AtomicReference<>();
- getUninterruptibly(commandStore.execute((PreLoadContext.Empty)() -> "Test",
- safeStore -> result.set(processTxnResultDirect(safeStore, txnId, txn, executeAt))));
+ getBlocking(commandStore.execute((PreLoadContext.Empty)() -> "Test",
+ safeStore -> result.set(processTxnResultDirect(safeStore, txnId, txn, executeAt))));
return result.get();
}
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
index fec84f5..636f724 100644
--- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -690,9 +690,9 @@
public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean isLoad, boolean startSync)
{
AsyncResult<Void> metadata = schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult();
- AsyncResult<Void> coordination = metadata.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult();
- AsyncResult<Void> data = coordination.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult();
- AsyncResult<Void> reads = data.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult();
+ AsyncResult<Void> coordination = metadata.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult());
+ AsyncResult<Void> data = coordination.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult());
+ AsyncResult<Void> reads = data.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult());
EpochReady ready = new EpochReady(topology.epoch(), metadata, coordination, data, reads);
topology().onTopologyUpdate(topology, () -> ready, e -> {});
diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index e0d40cd..b9602de 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -77,7 +77,6 @@
import accord.topology.TopologyManager;
import accord.utils.Gens;
import accord.utils.RandomSource;
-import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
@@ -100,6 +99,7 @@
import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
import static org.apache.cassandra.utils.AccordGenerators.fromQT;
public class SimulatedAccordCommandStore implements AutoCloseable
@@ -430,7 +430,7 @@
{
var result = processAsync(loadCtx, function);
processAll();
- return AsyncChains.getBlocking(result);
+ return getBlocking(result);
}
public <T extends Reply> AsyncResult<T> processAsync(TxnRequest<T> request)
@@ -440,7 +440,7 @@
public <T extends Reply> AsyncResult<T> processAsync(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
{
- return commandStore.submit(loadCtx, function).beginAsResult();
+ return commandStore.submit(loadCtx, function);
}
public Pair<TxnId, AsyncResult<PreAccept.PreAcceptOk>> enqueuePreAccept(Txn txn, FullRoute<?> route)
@@ -464,7 +464,7 @@
var reply = br.apply(safe);
Assertions.assertThat(reply.kind() == BeginRecovery.RecoverReply.Kind.Ok).isTrue();
return (BeginRecovery.RecoverOk) reply;
- }).beginAsResult());
+ }));
}
public void processAll()
diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index f7778c6..0fd249a 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -32,6 +32,7 @@
import org.junit.Before;
import org.junit.BeforeClass;
+import accord.api.AsyncExecutor;
import accord.api.RoutingKey;
import accord.impl.SizeOfIntersectionSorter;
import accord.local.Node;
@@ -56,7 +57,6 @@
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.CassandraRelevantProperties;
@@ -77,6 +77,7 @@
import org.assertj.core.api.Assertions;
import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
public abstract class SimulatedAccordCommandStoreTestBase extends CQLTester
@@ -202,7 +203,7 @@
{
var pair = assertDepsMessageAsync(instance, messageType, txn, route, keyConflicts, rangeConflicts);
instance.processAll();
- AsyncChains.getBlocking(pair.right);
+ getBlocking(pair.right);
return pair.left;
}
@@ -241,7 +242,7 @@
return Pair.create(pair.left, pair.right.map(success -> {
assertDeps(success.txnId, success.deps, cloneKeyConflicts, cloneRangeConflicts);
return null;
- }).beginAsResult());
+ }));
}
protected static Pair<TxnId, AsyncResult<?>> assertBeginRecoveryAsync(SimulatedAccordCommandStore instance,
@@ -260,7 +261,7 @@
Deps proposeDeps = LatestDeps.mergeProposal(Collections.singletonList(success), ok -> ok.deps);
assertDeps(success.txnId, proposeDeps, cloneKeyConflicts, cloneRangeConflicts);
return null;
- }).beginAsResult());
+ }));
}
protected static Pair<TxnId, AsyncResult<?>> assertBeginRecoveryAfterPreAcceptAsync(SimulatedAccordCommandStore instance,
@@ -285,10 +286,10 @@
assertDeps(success.txnId, success.deps, cloneKeyConflicts, cloneRangeConflicts);
return success;
});
- var delay = preAcceptAsync.flatMap(ignore -> AsyncChains.ofCallable(instance.unorderedScheduled, () -> {
+ var delay = preAcceptAsync.flatMap(ignore -> AsyncExecutor.chain(instance.unorderedScheduled, () -> {
Ballot ballot = Ballot.fromValues(instance.storeService.epoch(), instance.storeService.now(), nodeId);
return new BeginRecovery(nodeId, new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, instance.topology), txnId, null, false, txn, route, ballot);
- }));
+ }).beginAsResult());
var recoverAsync = delay.flatMap(br -> instance.processAsync(br, safe -> {
var reply = br.apply(safe);
Assertions.assertThat(reply.kind() == BeginRecovery.RecoverReply.Kind.Ok).isTrue();
@@ -298,7 +299,7 @@
return success;
}));
- return Pair.create(txnId, recoverAsync.beginAsResult());
+ return Pair.create(txnId, recoverAsync);
}
protected static void assertDeps(TxnId txnId, Deps deps,
diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
index d5454e7..d54058b 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
@@ -129,7 +129,7 @@
}
};
instance.maybeCacheEvict(txn.keys().toParticipants());
- instance.processAsync(preAccept).begin(counter);
+ instance.processAsync(preAccept).invoke(counter);
}
break;
default:
diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 42fec34..884638d 100644
--- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -28,7 +28,6 @@
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
@@ -649,14 +648,15 @@
}
@Override public boolean inStore() { return true; }
+ @Override public AsyncChain<Void> chain(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) { throw new UnsupportedOperationException();}
+ @Override public <T> AsyncChain<T> chain(PreLoadContext context, Function<? super SafeCommandStore, T> apply) { throw new UnsupportedOperationException(); }
+
@Override public Journal.Replayer replayer() { throw new UnsupportedOperationException(); }
@Override protected void ensureDurable(Ranges ranges, RedundantBefore onSuccess) {}
@Override public Agent agent() { return this; }
- @Override public AsyncChain<Void> build(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) { return null; }
- @Override public <T> AsyncChain<T> build(PreLoadContext context, Function<? super SafeCommandStore, T> apply) { throw new UnsupportedOperationException(); }
+ @Override public void execute(Runnable run) {}
@Override public void shutdown() { }
- @Override public <T> AsyncChain<T> build(Callable<T> task) { throw new UnsupportedOperationException(); }
@Override public void onFailedBootstrap(int attempts, String phase, Ranges ranges, Runnable retry, Throwable failure) { throw new UnsupportedOperationException(); }
@Override public void onStale(Timestamp staleSince, Ranges ranges) { throw new UnsupportedOperationException(); }
@Override public void onUncaughtException(Throwable t) { throw new UnsupportedOperationException(); }
diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtils.java b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
index 2ec07f7..6a0a1f5 100644
--- a/test/unit/org/apache/cassandra/utils/AssertionUtils.java
+++ b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
@@ -26,8 +26,6 @@
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.ThrowableAssert;
-import org.assertj.core.error.BasicErrorMessageFactory;
-import org.assertj.core.internal.Failures;
public class AssertionUtils
{
@@ -191,8 +189,6 @@
public ThrowableAssertPlus hasRootCause()
{
Throwable cause = Throwables.getRootCause(actual);
- if (cause == actual)
- throw Failures.instance().failure(this.info, new BasicErrorMessageFactory("%nExpected a root cause but cause was null", new Object[0]));
return new ThrowableAssertPlus(cause);
}
}