CEP-15: (C*) Improve Burn Tests to include thread scheduling
patch by David Capwell; reviewed by Benedict Elliott Smith for CASSANDRA-18203
diff --git a/accord-core/build.gradle b/accord-core/build.gradle
index 3c1472b..bb315b9 100644
--- a/accord-core/build.gradle
+++ b/accord-core/build.gradle
@@ -34,6 +34,8 @@
// These act as runtimeOnly dependencies to users
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'org.agrona:agrona:1.17.1'
+
+ testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.24.2'
}
task burn(type: JavaExec) {
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index e881b91..e81186c 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -819,12 +819,22 @@
public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpochHolder)
{
- super(id, time, agent, store, progressLogFactory, rangesForEpochHolder);
- this.executor = Executors.newSingleThreadExecutor(r -> {
+ this(id, time, agent, store, progressLogFactory, rangesForEpochHolder, Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']');
return thread;
- });
+ }));
+ }
+
+ private SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpochHolder, ExecutorService executor)
+ {
+ super(id, time, agent, store, progressLogFactory, rangesForEpochHolder);
+ this.executor = executor;
+ }
+
+ public static CommandStore.Factory factory(ExecutorService executor)
+ {
+ return (id, time, agent, store, progressLogFactory, rangesForEpoch) -> new SingleThread(id, time, agent, store, progressLogFactory, rangesForEpoch, executor);
}
void assertThread()
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 8d4e275..a4ebb4f 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -224,7 +224,7 @@
private synchronized void readComplete(CommandStore commandStore, Data result)
{
- Invariants.checkState(waitingOn.get(commandStore.id()));
+ Invariants.checkState(waitingOn.get(commandStore.id()), "Waiting on does not contain store %d; waitingOn=%s", commandStore.id(), waitingOn);
logger.trace("{}: read completed on {}", txnId, commandStore);
if (result != null)
data = data == null ? result : data.merge(result);
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java b/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java
index f4a9e63..3d87c45 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java
@@ -165,4 +165,26 @@
return reduce.reducer == reducer;
}
}
+
+ static class ReduceWithIdentity<A, B> extends AsyncChainCombiner<A, B>
+ {
+ private final B identity;
+ private final BiFunction<B, ? super A, B> reducer;
+
+ protected ReduceWithIdentity(List<? extends AsyncChain<? extends A>> inputs, B identity, BiFunction<B, ? super A, B> reducer)
+ {
+ super(inputs);
+ this.identity = identity;
+ this.reducer = reducer;
+ }
+
+ @Override
+ void complete(A[] results, BiConsumer<? super B, Throwable> callback)
+ {
+ B result = identity;
+ for (A r : results)
+ result = reducer.apply(result, r);
+ callback.accept(result, null);
+ }
+ }
}
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index e697d23..06f3f33 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -30,6 +30,7 @@
import accord.api.VisibleForImplementation;
import accord.utils.Invariants;
+import accord.utils.async.AsyncChainCombiner.ReduceWithIdentity;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -423,6 +424,16 @@
return new Reduce<>(Lists.newArrayList(a, b), reducer);
}
+ public static <A, B> AsyncChain<B> reduce(List<? extends AsyncChain<? extends A>> chains, B identity, BiFunction<B, ? super A, B> reducer)
+ {
+ switch (chains.size())
+ {
+ case 0: return AsyncChains.success(identity);
+ case 1: return chains.get(0).map(a -> reducer.apply(identity, a));
+ }
+ return new ReduceWithIdentity<>(chains, identity, reducer);
+ }
+
public static <V> V getBlocking(AsyncChain<V> chain) throws InterruptedException, ExecutionException
{
class Result
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index 995a028..18a923d 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -18,7 +18,6 @@
package accord.burn;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -30,7 +29,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -146,7 +144,7 @@
return i;
}
- static void burn(TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException
+ static void burn(TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency)
{
RandomSource random = new DefaultRandom();
long seed = random.nextLong();
@@ -155,7 +153,7 @@
burn(random, topologyFactory, clients, nodes, keyCount, operations, concurrency);
}
- static void burn(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException
+ static void burn(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency)
{
RandomSource random = new DefaultRandom();
System.out.println(seed);
@@ -163,7 +161,7 @@
burn(random, topologyFactory, clients, nodes, keyCount, operations, concurrency);
}
- static void reconcile(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException, ExecutionException, InterruptedException, TimeoutException
+ static void reconcile(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws ExecutionException, InterruptedException
{
ReconcilingLogger logReconciler = new ReconcilingLogger(logger);
@@ -295,7 +293,7 @@
}
}
- public static void main(String[] args) throws Exception
+ public static void main(String[] args)
{
Long overrideSeed = null;
int count = 1;
@@ -324,12 +322,12 @@
}
@Test
- public void testOne() throws Exception
+ public void testOne()
{
run(ThreadLocalRandom.current().nextLong());
}
- private static void run(long seed) throws Exception
+ private static void run(long seed)
{
logger.info("Seed: {}", seed);
Cluster.trace.trace("Seed: {}", seed);
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index 335446d..ffea504 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -34,6 +34,7 @@
import accord.utils.MessageTask;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import com.google.common.collect.Sets;
@@ -185,19 +186,20 @@
return result;
}
- private static Stream<MessageTask> syncEpochCommands(Node node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, long trgEpoch, boolean committedOnly) throws ExecutionException
+ private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, long trgEpoch, boolean committedOnly)
{
Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
Consumer<Command> commandConsumer = command -> syncMessages.merge(command.txnId(), new CheckStatusOk(node, command), CheckStatusOk::merge);
+ AsyncChain<Void> start;
if (committedOnly)
- getUninterruptibly(node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, srcEpoch, commandConsumer)));
+ start = node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, srcEpoch, commandConsumer));
else
- getUninterruptibly(node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, commandConsumer)));
+ start = node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, commandConsumer));
- return syncMessages.entrySet().stream().map(e -> {
+ return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
CommandSync sync = new CommandSync(e.getKey(), e.getValue(), srcEpoch, trgEpoch);
return MessageTask.of(node, recipients.apply(sync), sync.toString(), sync::process);
- });
+ }));
}
private static final boolean PREACCEPTED = false;
@@ -206,25 +208,23 @@
/**
* Syncs all replicated commands. Overkill, but useful for confirming issues in optimizedSync
*/
- private static Stream<MessageTask> thoroughSync(Node node, long syncEpoch) throws ExecutionException
+ private static AsyncChain<Stream<MessageTask>> thoroughSync(Node node, long syncEpoch)
{
Topology syncTopology = node.configService().getTopologyForEpoch(syncEpoch);
Topology localTopology = syncTopology.forNode(node.id());
Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
Ranges ranges = localTopology.ranges();
- Stream<MessageTask> messageStream = Stream.empty();
+ List<AsyncChain<Stream<MessageTask>>> work = new ArrayList<>();
for (long epoch=1; epoch<=syncEpoch; epoch++)
- {
- messageStream = Stream.concat(messageStream, syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
- }
- return messageStream;
+ work.add(syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
+ return AsyncChains.reduce(work, Stream.empty(), Stream::concat);
}
/**
* Syncs all newly replicated commands when nodes are gaining ranges and the current epoch
*/
- private static Stream<MessageTask> optimizedSync(Node node, long srcEpoch) throws ExecutionException
+ private static AsyncChain<Stream<MessageTask>> optimizedSync(Node node, long srcEpoch)
{
long trgEpoch = srcEpoch + 1;
Topology syncTopology = node.configService().getTopologyForEpoch(srcEpoch);
@@ -233,7 +233,7 @@
Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> node.topology().preciseEpochs(cmd.route, trgEpoch, trgEpoch).nodes();
// backfill new replicas with operations from prior epochs
- Stream<MessageTask> messageStream = Stream.empty();
+ List<AsyncChain<Stream<MessageTask>>> work = new ArrayList<>(localTopology.shards().size());
for (Shard syncShard : localTopology.shards())
{
for (Shard nextShard : nextTopology.shards())
@@ -254,52 +254,35 @@
Ranges ranges = Ranges.single(intersection);
for (long epoch=1; epoch<srcEpoch; epoch++)
- messageStream = Stream.concat(messageStream, syncEpochCommands(node,
- epoch,
- ranges,
- cmd -> newNodes,
- trgEpoch, COMMITTED_ONLY));
+ work.add(syncEpochCommands(node, epoch, ranges, cmd -> newNodes, trgEpoch, COMMITTED_ONLY));
}
}
// update all current and future replicas with the contents of the sync epoch
- messageStream = Stream.concat(messageStream, syncEpochCommands(node,
- srcEpoch,
- localTopology.ranges(),
- allNodes,
- trgEpoch, PREACCEPTED));
- return messageStream;
+ work.add(syncEpochCommands(node, srcEpoch, localTopology.ranges(), allNodes, trgEpoch, PREACCEPTED));
+
+ return AsyncChains.reduce(work, Stream.empty(), Stream::concat);
}
- public static AsyncResult<Void> sync(Node node, long syncEpoch)
+ private static AsyncChain<Void> sync(Node node, long syncEpoch)
{
- Stream<MessageTask> messageStream;
- try
- {
- messageStream = optimizedSync(node, syncEpoch);
- }
- catch (ExecutionException e)
- {
- return AsyncResults.failure(e.getCause());
- }
+ return optimizedSync(node, syncEpoch)
+ .flatMap(messageStream -> {
+ Iterator<MessageTask> iter = messageStream.iterator();
+ if (!iter.hasNext()) return AsyncResults.success(null);
- Iterator<MessageTask> iter = messageStream.iterator();
- if (!iter.hasNext())
- {
- return AsyncResults.success(null);
- }
+ MessageTask first = iter.next();
+ MessageTask last = first;
+ while (iter.hasNext())
+ {
+ MessageTask next = iter.next();
+ last.addCallback(next);
+ last = next;
+ }
- MessageTask first = iter.next();
- MessageTask last = first;
- while (iter.hasNext())
- {
- MessageTask next = iter.next();
- last.addCallback(next);
- last = next;
- }
-
- first.run();
- return dieExceptionally(last);
+ first.run();
+ return dieExceptionally(last);
+ });
}
public AsyncResult<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 2898303..95b00b9 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -223,7 +223,7 @@
() -> new ListStore(node), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
new ListAgent(30L, onFailure),
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
+ SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending, randomSupplier.get())));
}
List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
new file mode 100644
index 0000000..8e84f8e
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.ProgressLog;
+import accord.impl.InMemoryCommandStore;
+import accord.impl.InMemoryCommandStores;
+import accord.local.CommandStores;
+import accord.local.NodeTimeService;
+import accord.local.ShardDistributor;
+import accord.utils.RandomSource;
+
+public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
+{
+ private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService executorService)
+ {
+ super(time, agent, store, shardDistributor, progressLogFactory, InMemoryCommandStore.SingleThread.factory(executorService));
+ }
+
+ public static CommandStores.Factory factory(PendingQueue pending, RandomSource random)
+ {
+ SimulatedDelayedExecutorService executorService = new SimulatedDelayedExecutorService(pending, random);
+ return (time, agent, store, shardDistributor, progressLogFactory) -> new DelayedCommandStores(time, agent, store, shardDistributor, progressLogFactory, executorService);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
new file mode 100644
index 0000000..e400d08
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import accord.burn.random.FrequentLargeRange;
+import accord.burn.random.RandomLong;
+import accord.burn.random.RandomWalkRange;
+import accord.utils.RandomSource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+public class SimulatedDelayedExecutorService extends AbstractExecutorService
+{
+ private final PendingQueue pending;
+ private final RandomSource random;
+ private final RandomLong jitterInNano;
+
+ public SimulatedDelayedExecutorService(PendingQueue pending, RandomSource random)
+ {
+ this.pending = pending;
+ this.random = random;
+ // this is different from Apache Cassandra Simulator as this is computed differently for each executor
+ // rather than being a global config
+ double ratio = random.nextInt(1, 11) / 100.0D;
+ this.jitterInNano = new FrequentLargeRange(
+ new RandomWalkRange(random, microToNanos(0), microToNanos(50)),
+ new RandomWalkRange(random, microToNanos(50), msToNanos(5)),
+ ratio);
+ }
+
+ private static int msToNanos(int value)
+ {
+ return Math.toIntExact(TimeUnit.MILLISECONDS.toNanos(value));
+ }
+
+ private static int microToNanos(int value)
+ {
+ return Math.toIntExact(TimeUnit.MICROSECONDS.toNanos(value));
+ }
+
+ @Override
+ protected <T> Task<T> newTaskFor(Runnable runnable, T value)
+ {
+ return newTaskFor(Executors.callable(runnable, value));
+ }
+
+ @Override
+ protected <T> Task<T> newTaskFor(Callable<T> callable)
+ {
+ return new Task<>(callable);
+ }
+
+ private Task<?> newTaskFor(Runnable command)
+ {
+ return command instanceof Task ? (Task<?>) command : newTaskFor(command, null);
+ }
+
+ @Override
+ public void execute(Runnable command)
+ {
+ pending.add(newTaskFor(command), jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public void shutdown()
+ {
+ }
+
+ @Override
+ public List<Runnable> shutdownNow()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ {
+ return false;
+ }
+
+
+ private static class Task<T> extends FutureTask<T> implements Pending
+ {
+ public Task(Callable<T> fn)
+ {
+ super(fn);
+ }
+ }
+}
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index 2465a8b..9e663ee 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -42,6 +42,7 @@
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
+ testImplementation 'org.junit.jupiter:junit-jupiter-params:5.7.0'
testImplementation 'ch.qos.logback:logback-classic:1.2.3'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'