CEP-15: (C*) Improve Burn Tests to include thread scheduling

patch by David Capwell; reviewed by Benedict Elliott Smith for CASSANDRA-18203
diff --git a/accord-core/build.gradle b/accord-core/build.gradle
index 3c1472b..bb315b9 100644
--- a/accord-core/build.gradle
+++ b/accord-core/build.gradle
@@ -34,6 +34,8 @@
     // These act as runtimeOnly dependencies to users
     implementation 'org.slf4j:slf4j-api:1.7.36'
     implementation 'org.agrona:agrona:1.17.1'
+
+    testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.24.2'
 }
 
 task burn(type: JavaExec) {
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index e881b91..e81186c 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -819,12 +819,22 @@
 
         public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpochHolder)
         {
-            super(id, time, agent, store, progressLogFactory, rangesForEpochHolder);
-            this.executor = Executors.newSingleThreadExecutor(r -> {
+            this(id, time, agent, store, progressLogFactory, rangesForEpochHolder, Executors.newSingleThreadExecutor(r -> {
                 Thread thread = new Thread(r);
                 thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']');
                 return thread;
-            });
+            }));
+        }
+
+        private SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpochHolder, ExecutorService executor)
+        {
+            super(id, time, agent, store, progressLogFactory, rangesForEpochHolder);
+            this.executor = executor;
+        }
+
+        public static CommandStore.Factory factory(ExecutorService executor)
+        {
+            return (id, time, agent, store, progressLogFactory, rangesForEpoch) -> new SingleThread(id, time, agent, store, progressLogFactory, rangesForEpoch, executor);
         }
 
         void assertThread()
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 8d4e275..a4ebb4f 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -224,7 +224,7 @@
 
     private synchronized void readComplete(CommandStore commandStore, Data result)
     {
-        Invariants.checkState(waitingOn.get(commandStore.id()));
+        Invariants.checkState(waitingOn.get(commandStore.id()), "Waiting on does not contain store %d; waitingOn=%s", commandStore.id(), waitingOn);
         logger.trace("{}: read completed on {}", txnId, commandStore);
         if (result != null)
             data = data == null ? result : data.merge(result);
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java b/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java
index f4a9e63..3d87c45 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java
@@ -165,4 +165,26 @@
             return reduce.reducer == reducer;
         }
     }
+
+    static class ReduceWithIdentity<A, B> extends AsyncChainCombiner<A, B>
+    {
+        private final B identity;
+        private final BiFunction<B, ? super A, B> reducer;
+
+        protected ReduceWithIdentity(List<? extends AsyncChain<? extends A>> inputs, B identity, BiFunction<B, ? super A, B> reducer)
+        {
+            super(inputs);
+            this.identity = identity;
+            this.reducer = reducer;
+        }
+
+        @Override
+        void complete(A[] results, BiConsumer<? super B, Throwable> callback)
+        {
+            B result = identity;
+            for (A r : results)
+                result = reducer.apply(result, r);
+            callback.accept(result, null);
+        }
+    }
 }
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index e697d23..06f3f33 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -30,6 +30,7 @@
 
 import accord.api.VisibleForImplementation;
 import accord.utils.Invariants;
+import accord.utils.async.AsyncChainCombiner.ReduceWithIdentity;
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -423,6 +424,16 @@
         return new Reduce<>(Lists.newArrayList(a, b), reducer);
     }
 
+    public static <A, B> AsyncChain<B> reduce(List<? extends AsyncChain<? extends A>> chains, B identity, BiFunction<B, ? super A, B> reducer)
+    {
+        switch (chains.size())
+        {
+            case 0: return AsyncChains.success(identity);
+            case 1: return chains.get(0).map(a -> reducer.apply(identity, a));
+        }
+        return new ReduceWithIdentity<>(chains, identity, reducer);
+    }
+
     public static <V> V getBlocking(AsyncChain<V> chain) throws InterruptedException, ExecutionException
     {
         class Result
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index 995a028..18a923d 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -18,7 +18,6 @@
 
 package accord.burn;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,7 +29,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -146,7 +144,7 @@
         return i;
     }
 
-    static void burn(TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException
+    static void burn(TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency)
     {
         RandomSource random = new DefaultRandom();
         long seed = random.nextLong();
@@ -155,7 +153,7 @@
         burn(random, topologyFactory, clients, nodes, keyCount, operations, concurrency);
     }
 
-    static void burn(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException
+    static void burn(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency)
     {
         RandomSource random = new DefaultRandom();
         System.out.println(seed);
@@ -163,7 +161,7 @@
         burn(random, topologyFactory, clients, nodes, keyCount, operations, concurrency);
     }
 
-    static void reconcile(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException, ExecutionException, InterruptedException, TimeoutException
+    static void reconcile(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws ExecutionException, InterruptedException
     {
         ReconcilingLogger logReconciler = new ReconcilingLogger(logger);
 
@@ -295,7 +293,7 @@
         }
     }
 
-    public static void main(String[] args) throws Exception
+    public static void main(String[] args)
     {
         Long overrideSeed = null;
         int count = 1;
@@ -324,12 +322,12 @@
     }
 
     @Test
-    public void testOne() throws Exception
+    public void testOne()
     {
         run(ThreadLocalRandom.current().nextLong());
     }
 
-    private static void run(long seed) throws Exception
+    private static void run(long seed)
     {
         logger.info("Seed: {}", seed);
         Cluster.trace.trace("Seed: {}", seed);
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index 335446d..ffea504 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -34,6 +34,7 @@
 import accord.utils.MessageTask;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 import com.google.common.collect.Sets;
@@ -185,19 +186,20 @@
         return result;
     }
 
-    private static Stream<MessageTask> syncEpochCommands(Node node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, long trgEpoch, boolean committedOnly) throws ExecutionException
+    private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, long trgEpoch, boolean committedOnly)
     {
         Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
         Consumer<Command> commandConsumer = command -> syncMessages.merge(command.txnId(), new CheckStatusOk(node, command), CheckStatusOk::merge);
+        AsyncChain<Void> start;
         if (committedOnly)
-            getUninterruptibly(node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, srcEpoch, commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, srcEpoch, commandConsumer));
         else
-            getUninterruptibly(node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, commandConsumer));
 
-        return syncMessages.entrySet().stream().map(e -> {
+        return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
             CommandSync sync = new CommandSync(e.getKey(), e.getValue(), srcEpoch, trgEpoch);
             return MessageTask.of(node, recipients.apply(sync), sync.toString(), sync::process);
-        });
+        }));
     }
 
     private static final boolean PREACCEPTED = false;
@@ -206,25 +208,23 @@
     /**
      * Syncs all replicated commands. Overkill, but useful for confirming issues in optimizedSync
      */
-    private static Stream<MessageTask> thoroughSync(Node node, long syncEpoch) throws ExecutionException
+    private static AsyncChain<Stream<MessageTask>> thoroughSync(Node node, long syncEpoch)
     {
         Topology syncTopology = node.configService().getTopologyForEpoch(syncEpoch);
         Topology localTopology = syncTopology.forNode(node.id());
         Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
 
         Ranges ranges = localTopology.ranges();
-        Stream<MessageTask> messageStream = Stream.empty();
+        List<AsyncChain<Stream<MessageTask>>> work = new ArrayList<>();
         for (long epoch=1; epoch<=syncEpoch; epoch++)
-        {
-            messageStream = Stream.concat(messageStream, syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
-        }
-        return messageStream;
+            work.add(syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
+        return AsyncChains.reduce(work, Stream.empty(), Stream::concat);
     }
 
     /**
      * Syncs all newly replicated commands when nodes are gaining ranges and the current epoch
      */
-    private static Stream<MessageTask> optimizedSync(Node node, long srcEpoch) throws ExecutionException
+    private static AsyncChain<Stream<MessageTask>> optimizedSync(Node node, long srcEpoch)
     {
         long trgEpoch = srcEpoch + 1;
         Topology syncTopology = node.configService().getTopologyForEpoch(srcEpoch);
@@ -233,7 +233,7 @@
         Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> node.topology().preciseEpochs(cmd.route, trgEpoch, trgEpoch).nodes();
 
         // backfill new replicas with operations from prior epochs
-        Stream<MessageTask> messageStream = Stream.empty();
+        List<AsyncChain<Stream<MessageTask>>> work = new ArrayList<>(localTopology.shards().size());
         for (Shard syncShard : localTopology.shards())
         {
             for (Shard nextShard : nextTopology.shards())
@@ -254,52 +254,35 @@
 
                 Ranges ranges = Ranges.single(intersection);
                 for (long epoch=1; epoch<srcEpoch; epoch++)
-                    messageStream = Stream.concat(messageStream, syncEpochCommands(node,
-                                                                                   epoch,
-                                                                                   ranges,
-                                                                                   cmd -> newNodes,
-                                                                                   trgEpoch, COMMITTED_ONLY));
+                    work.add(syncEpochCommands(node, epoch, ranges, cmd -> newNodes, trgEpoch, COMMITTED_ONLY));
             }
         }
 
         // update all current and future replicas with the contents of the sync epoch
-        messageStream = Stream.concat(messageStream, syncEpochCommands(node,
-                                                                       srcEpoch,
-                                                                       localTopology.ranges(),
-                                                                       allNodes,
-                                                                       trgEpoch, PREACCEPTED));
-        return messageStream;
+        work.add(syncEpochCommands(node, srcEpoch, localTopology.ranges(), allNodes, trgEpoch, PREACCEPTED));
+
+        return AsyncChains.reduce(work, Stream.empty(), Stream::concat);
     }
 
-    public static AsyncResult<Void> sync(Node node, long syncEpoch)
+    private static AsyncChain<Void> sync(Node node, long syncEpoch)
     {
-        Stream<MessageTask> messageStream;
-        try
-        {
-            messageStream = optimizedSync(node, syncEpoch);
-        }
-        catch (ExecutionException e)
-        {
-            return AsyncResults.failure(e.getCause());
-        }
+        return optimizedSync(node, syncEpoch)
+                .flatMap(messageStream -> {
+                    Iterator<MessageTask> iter = messageStream.iterator();
+                    if (!iter.hasNext()) return AsyncResults.success(null);
 
-        Iterator<MessageTask> iter = messageStream.iterator();
-        if (!iter.hasNext())
-        {
-            return AsyncResults.success(null);
-        }
+                    MessageTask first = iter.next();
+                    MessageTask last = first;
+                    while (iter.hasNext())
+                    {
+                        MessageTask next = iter.next();
+                        last.addCallback(next);
+                        last = next;
+                    }
 
-        MessageTask first = iter.next();
-        MessageTask last = first;
-        while (iter.hasNext())
-        {
-            MessageTask next = iter.next();
-            last.addCallback(next);
-            last = next;
-        }
-
-        first.run();
-        return dieExceptionally(last);
+                    first.run();
+                    return dieExceptionally(last);
+                });
     }
 
     public AsyncResult<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 2898303..95b00b9 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -223,7 +223,7 @@
                                           () -> new ListStore(node), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
                                           new ListAgent(30L, onFailure),
                                           randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
-                                          SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
+                                          SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending, randomSupplier.get())));
             }
 
             List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
new file mode 100644
index 0000000..8e84f8e
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.ProgressLog;
+import accord.impl.InMemoryCommandStore;
+import accord.impl.InMemoryCommandStores;
+import accord.local.CommandStores;
+import accord.local.NodeTimeService;
+import accord.local.ShardDistributor;
+import accord.utils.RandomSource;
+
+public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
+{
+    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService executorService)
+    {
+        super(time, agent, store, shardDistributor, progressLogFactory, InMemoryCommandStore.SingleThread.factory(executorService));
+    }
+
+    public static CommandStores.Factory factory(PendingQueue pending, RandomSource random)
+    {
+        SimulatedDelayedExecutorService executorService = new SimulatedDelayedExecutorService(pending, random);
+        return (time, agent, store, shardDistributor, progressLogFactory) -> new DelayedCommandStores(time, agent, store, shardDistributor, progressLogFactory, executorService);
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
new file mode 100644
index 0000000..e400d08
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import accord.burn.random.FrequentLargeRange;
+import accord.burn.random.RandomLong;
+import accord.burn.random.RandomWalkRange;
+import accord.utils.RandomSource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+public class SimulatedDelayedExecutorService extends AbstractExecutorService
+{
+    private final PendingQueue pending;
+    private final RandomSource random;
+    private final RandomLong jitterInNano;
+
+    public SimulatedDelayedExecutorService(PendingQueue pending, RandomSource random)
+    {
+        this.pending = pending;
+        this.random = random;
+        // this is different from Apache Cassandra Simulator as this is computed differently for each executor
+        // rather than being a global config
+        double ratio = random.nextInt(1, 11) / 100.0D;
+        this.jitterInNano = new FrequentLargeRange(
+                new RandomWalkRange(random, microToNanos(0), microToNanos(50)),
+                new RandomWalkRange(random, microToNanos(50), msToNanos(5)),
+                ratio);
+    }
+
+    private static int msToNanos(int value)
+    {
+        return Math.toIntExact(TimeUnit.MILLISECONDS.toNanos(value));
+    }
+
+    private static int microToNanos(int value)
+    {
+        return Math.toIntExact(TimeUnit.MICROSECONDS.toNanos(value));
+    }
+
+    @Override
+    protected <T> Task<T> newTaskFor(Runnable runnable, T value)
+    {
+        return newTaskFor(Executors.callable(runnable, value));
+    }
+
+    @Override
+    protected <T> Task<T> newTaskFor(Callable<T> callable)
+    {
+        return new Task<>(callable);
+    }
+
+    private Task<?> newTaskFor(Runnable command)
+    {
+        return command instanceof Task ? (Task<?>) command : newTaskFor(command, null);
+    }
+
+    @Override
+    public void execute(Runnable command)
+    {
+        pending.add(newTaskFor(command), jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
+    }
+
+    @Override
+    public void shutdown()
+    {
+    }
+
+    @Override
+    public List<Runnable> shutdownNow()
+    {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isShutdown()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isTerminated()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+    {
+        return false;
+    }
+
+
+    private static class Task<T> extends FutureTask<T> implements Pending
+    {
+        public Task(Callable<T> fn)
+        {
+            super(fn);
+        }
+    }
+}
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index 2465a8b..9e663ee 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -42,6 +42,7 @@
 
 dependencies {
     testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
+    testImplementation 'org.junit.jupiter:junit-jupiter-params:5.7.0'
     testImplementation 'ch.qos.logback:logback-classic:1.2.3'
     testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'