CEP-15: (C*) Improve the chaos generation for Burn Tests: slow/flakey connections and dropped messages (#57)


patch by David Capwell; reviewed by Benedict Elliott Smith for CASSANDRA-18451
diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index d8ebab8..d33c123 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -269,6 +269,13 @@
         }
 
         long lastAcked = epochs.lastAcknowledged;
+        // TODO (now, review): lastAcked == 0, lastReceived = 2
+        // if we wait for epoch=1.acknowledge the test seems to wait forever... looks like burn test doesn't ack epoch=1
+        if (lastAcked == 0 && lastReceived > 0)
+        {
+            epochs.acknowledgeFuture(epochs.minEpoch()).addCallback(() -> reportTopology(topology, startSync));
+            return;
+        }
         if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
         {
             epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology, startSync));
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 1916f0b..276f631 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -169,10 +169,18 @@
         configService.registerListener(this);
     }
 
-    // TODO (cleanup, testing): remove, only used by Maelstrom
-    public AsyncResult<Void> start()
+    /**
+     * This starts the node for tests and makes sure that the provided topology is acknowledged correctly.  This method is not
+     * safe for production systems as it doesn't handle restarts and partially acknowledged histories
+     * @return {@link EpochReady#metadata}
+     */
+    @VisibleForTesting
+    public AsyncResult<Void> unsafeStart()
     {
-        return onTopologyUpdateInternal(configService.currentTopology(), false).metadata;
+        EpochReady ready = onTopologyUpdateInternal(configService.currentTopology(), false);
+        ready.coordination.addCallback(() -> this.topology.onEpochSyncComplete(id, topology.epoch()));
+        configService.acknowledgeEpoch(ready, false);
+        return ready.metadata;
     }
 
     public CommandStores commandStores()
diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java b/accord-core/src/main/java/accord/local/PreLoadContext.java
index cd569ac..0ef8f55 100644
--- a/accord-core/src/main/java/accord/local/PreLoadContext.java
+++ b/accord-core/src/main/java/accord/local/PreLoadContext.java
@@ -23,12 +23,15 @@
 import accord.primitives.Keys;
 import accord.primitives.Seekables;
 import accord.primitives.TxnId;
+import com.google.common.collect.Iterators;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
 import com.google.common.collect.Sets;
 
+import java.util.AbstractCollection;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
@@ -52,6 +55,28 @@
      */
     default Collection<TxnId> additionalTxnIds() { return Collections.emptyList(); }
 
+    default Collection<TxnId> txnIds()
+    {
+        TxnId primaryTxnId = primaryTxnId();
+        Collection<TxnId> additional = additionalTxnIds();
+        if (primaryTxnId == null) return additional;
+        if (additional.isEmpty()) return Collections.singleton(primaryTxnId);
+        return new AbstractCollection<TxnId>()
+        {
+            @Override
+            public Iterator<TxnId> iterator()
+            {
+                return Iterators.concat(Iterators.singletonIterator(primaryTxnId), additional.iterator());
+            }
+
+            @Override
+            public int size()
+            {
+                return 1 + additional.size();
+            }
+        };
+    }
+
     @Inline
     default void forEachId(Consumer<TxnId> consumer)
     {
diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java
index 1554e5d..44abe4f 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -79,7 +79,7 @@
     }
 
     @Override
-    public synchronized AcceptReply apply(SafeCommandStore safeStore)
+    public AcceptReply apply(SafeCommandStore safeStore)
     {
         // TODO (now): we previously checked isAffectedByBootstrap(txnId) here and took this branch also, try to remember why
         if (minUnsyncedEpoch < txnId.epoch())
diff --git a/accord-core/src/main/java/accord/messages/SafeCallback.java b/accord-core/src/main/java/accord/messages/SafeCallback.java
index a8a1416..0f3cd3a 100644
--- a/accord-core/src/main/java/accord/messages/SafeCallback.java
+++ b/accord-core/src/main/java/accord/messages/SafeCallback.java
@@ -55,6 +55,11 @@
         failure(to, new Timeout(null, null));
     }
 
+    public void onCallbackFailure(Node.Id to, Throwable t)
+    {
+        safeCall(to, t, Callback::onCallbackFailure);
+    }
+
     private interface SafeCall<T, P>
     {
         void accept(Callback<T> callback, Node.Id id, P param) throws Throwable;
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 256e5f8..ec1e81e 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -152,7 +152,7 @@
                              SizeOfIntersectionSorter.SUPPLIER,
                              SimpleProgressLog::new,
                              InMemoryCommandStores.Synchronized::new);
-        awaitUninterruptibly(node.start());
+        awaitUninterruptibly(node.unsafeStart());
         return node;
     }
 }
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index ca63eb0..d8bb0ae 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -40,7 +40,10 @@
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
+import accord.burn.random.FrequentLargeRange;
+import accord.impl.MessageListener;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +56,6 @@
 import accord.impl.basic.RandomDelayQueue.Factory;
 import accord.impl.TopologyFactory;
 import accord.impl.basic.Packet;
-import accord.impl.basic.PendingQueue;
 import accord.impl.basic.SimulatedDelayedExecutorService;
 import accord.impl.list.ListAgent;
 import accord.impl.list.ListQuery;
@@ -80,7 +82,7 @@
 {
     private static final Logger logger = LoggerFactory.getLogger(BurnTest.class);
 
-    static List<Packet> generate(RandomSource random, Function<? super CommandStore, AsyncExecutor> executor, List<Id> clients, List<Id> nodes, int keyCount, int operations)
+    static List<Packet> generate(RandomSource random, MessageListener listener, Function<? super CommandStore, AsyncExecutor> executor, List<Id> clients, List<Id> nodes, int keyCount, int operations)
     {
         List<Key> keys = new ArrayList<>();
         for (int i = 0 ; i < keyCount ; ++i)
@@ -108,7 +110,7 @@
                 Ranges ranges = Ranges.of(requestRanges.toArray(new Range[0]));
                 ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, ranges, ranges);
                 ListQuery query = new ListQuery(client, count);
-                ListRequest request = new ListRequest(new Txn.InMemory(ranges, read, query, null));
+                ListRequest request = new ListRequest(new Txn.InMemory(ranges, read, query, null), listener);
                 packets.add(new Packet(client, node, count, request));
 
 
@@ -135,7 +137,7 @@
                     requestKeys.addAll(update.keySet());
                 ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, readKeys, new Keys(requestKeys));
                 ListQuery query = new ListQuery(client, count);
-                ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update));
+                ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update), listener);
                 packets.add(new Packet(client, node, count, request));
             }
         }
@@ -192,7 +194,7 @@
     {
         List<Throwable> failures = Collections.synchronizedList(new ArrayList<>());
         RandomDelayQueue delayQueue = new Factory(random).get();
-        PendingQueue queue = new PropagatingPendingQueue(failures, delayQueue);
+        PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, delayQueue);
         RandomSource retryRandom = random.fork();
         ListAgent agent = new ListAgent(1000L, failures::add, retry -> {
             long delay = retryRandom.nextInt(1, 15);
@@ -200,16 +202,23 @@
         });
 
         Supplier<LongSupplier> nowSupplier = () -> {
-            RandomSource jitter = random.fork();
-            // TODO (expected): jitter should be random walk with intermittent long periods
-            return () -> Math.max(0, delayQueue.nowInMillis() + (jitter.nextInt(10) - 5));
+            RandomSource forked = random.fork();
+            return FrequentLargeRange.builder(forked)
+                                                   .ratio(1, 5)
+                                                   .small(50, 5000, TimeUnit.MICROSECONDS)
+                                                   .large(1, 10, TimeUnit.MILLISECONDS)
+                                                   .build()
+                    .mapAsLong(j -> Math.max(0, queue.nowInMillis() + j))
+                    .asLongSupplier(forked);
         };
 
         StrictSerializabilityVerifier strictSerializable = new StrictSerializabilityVerifier(keyCount);
-        SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, agent, random.fork());
+        SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, agent);
         Function<CommandStore, AsyncExecutor> executor = ignore -> globalExecutor;
 
-        Packet[] requests = toArray(generate(random, executor, clients, nodes, keyCount, operations), Packet[]::new);
+        MessageListener listener = MessageListener.get();
+
+        Packet[] requests = toArray(generate(random, listener, executor, clients, nodes, keyCount, operations), Packet[]::new);
         int[] starts = new int[requests.length];
         Packet[] replies = new Packet[requests.length];
 
@@ -239,14 +248,14 @@
             {
                 int i = requestIndex.getAndIncrement();
                 starts[i] = clock.incrementAndGet();
-                queue.add(requests[i]);
+                queue.addNoDelay(requests[i]);
                 if (i == requests.length - 1)
                     onSubmitted.get().run();
             }
 
             try
             {
-                if (reply.responseKeys == null && reply.read != null && reply.read.length == 0)
+                if (!reply.isSuccess() && reply.fault() == ListResult.Fault.HeartBeat)
                     return; // interrupted; will fetch our actual reply once rest of simulation is finished (but wanted to send another request to keep correct number in flight)
 
                 int start = starts[(int)packet.replyId];
@@ -254,13 +263,19 @@
                 logger.debug("{} at [{}, {}]", reply, start, end);
                 replies[(int)packet.replyId] = packet;
 
-                if (reply.responseKeys == null)
+                if (!reply.isSuccess())
                 {
-                    if (reply.read == null) nacks.incrementAndGet();
-                    else if (reply.read.length == 1) lost.incrementAndGet();
-                    else if (reply.read.length == 2) truncated.incrementAndGet();
-                    else if (reply.read.length == 3) failedToCheck.incrementAndGet();
-                    else throw new AssertionError();
+
+                    switch (reply.fault())
+                    {
+                        case Lost:          lost.incrementAndGet();             break;
+                        case Invalidated:   nacks.incrementAndGet();            break;
+                        case Failure:       failedToCheck.incrementAndGet();    break;
+                        case Truncated:     truncated.incrementAndGet();        break;
+                        // txn was applied?, but client saw a timeout, so response isn't known
+                        case Other:                                             break;
+                        default:            throw new AssertionError("Unexpected fault: " + reply.fault());
+                    }
                     return;
                 }
 
@@ -292,7 +307,7 @@
         EnumMap<MessageType, Cluster.Stats> messageStatsMap;
         try
         {
-            messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), () -> queue,
+            messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), listener, () -> queue, queue::checkFailures,
                                           responseSink, globalExecutor,
                                           random::fork, nowSupplier,
                                           topologyFactory, initialRequests::poll,
@@ -313,12 +328,18 @@
         logger.info("Message counts: {}", messageStatsMap.entrySet());
         if (clock.get() != operations * 2)
         {
+            StringBuilder sb = new StringBuilder();
             for (int i = 0 ; i < requests.length ; ++i)
             {
-                logger.info("{}", requests[i]);
-                logger.info("\t\t" + replies[i]);
+                // since this only happens when operations are lost, only log the ones without a reply to lower the amount of noise
+                if (replies[i] == null)
+                {
+                    sb.setLength(0);
+                    sb.append(requests[i]).append("\n\t\t").append(replies[i]);
+                    logger.info(sb.toString());
+                }
             }
-            throw new AssertionError("Incomplete set of responses");
+            throw new AssertionError("Incomplete set of responses; clock=" + clock.get() + ", expected operations=" + (operations * 2));
         }
     }
 
@@ -355,6 +376,7 @@
     }
 
     @Test
+    @Timeout(value = 3, unit = TimeUnit.MINUTES)
     public void testOne()
     {
         run(ThreadLocalRandom.current().nextLong(), 1000);
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index b196023..ae3d3f0 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -61,6 +61,9 @@
 
     public synchronized void syncComplete(Node originator, Collection<Node.Id> cluster, long epoch)
     {
+        // topology is init topology
+        if (pendingTopologies.isEmpty())
+            return;
         Map<Node.Id, Ranges> pending = pendingTopologies.get(epoch);
         if (pending == null || null == pending.remove(originator.id()))
             throw new AssertionError();
diff --git a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
index 4241798..41cae46 100644
--- a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
+++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
@@ -18,59 +18,121 @@
 
 package accord.burn.random;
 
-import accord.utils.Invariants;
+import accord.utils.Gen;
+import accord.utils.Gen.LongGen;
+import accord.utils.Gens;
 import accord.utils.RandomSource;
 
-public class FrequentLargeRange implements RandomLong
-{
-    private final RandomLong small, large;
-    private final double ratio;
-    private final int steps;
-    private final double lower, upper;
-    private int run = -1;
-    private long smallCount = 0, largeCount = 0;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
-    public FrequentLargeRange(RandomLong small, RandomLong large, double ratio)
+public class FrequentLargeRange implements LongGen
+{
+    private final LongGen small, large;
+    private final Gen<Boolean> runs;
+
+    public FrequentLargeRange(LongGen small, LongGen large, double ratio)
     {
-        Invariants.checkArgument(ratio > 0 && ratio <= 1);
         this.small = small;
         this.large = large;
-        this.ratio = ratio;
-        this.steps = (int) (1 / ratio);
-        this.lower = ratio * .8;
-        this.upper = ratio * 1.2;
+        this.runs = Gens.bools().biasedRepeatingRuns(ratio);
     }
 
     @Override
-    public long getLong(RandomSource randomSource)
+    public long nextLong(RandomSource randomSource)
     {
-        if (run != -1)
+        if (runs.next(randomSource)) return large.nextLong(randomSource);
+        else                         return small.nextLong(randomSource);
+    }
+
+    public static Builder builder(RandomSource randomSource)
+    {
+        return new Builder(randomSource);
+    }
+
+    public static class Builder
+    {
+        private final RandomSource random;
+        private Double ratio;
+        private LongGen small, large;
+
+        public Builder(RandomSource random)
         {
-            run--;
-            largeCount++;
-            return large.getLong(randomSource);
+            this.random = random;
         }
-        double currentRatio = largeCount / (double) (smallCount + largeCount);
-        if (currentRatio < lower)
+
+        public Builder ratio(double ratio)
         {
-            // not enough large
-            largeCount++;
-            return large.getLong(randomSource);
+            this.ratio = ratio;
+            return this;
         }
-        if (currentRatio > upper)
+
+        public Builder ratio(int min, int max)
         {
-            // not enough small
-            smallCount++;
-            return small.getLong(randomSource);
+            this.ratio = random.nextInt(min, max) / 100.0D;
+            return this;
         }
-        if (randomSource.nextDouble() < ratio)
+
+        public Builder small(Duration min, Duration max)
         {
-            run = randomSource.nextInt(steps);
-            run--;
-            largeCount++;
-            return large.getLong(randomSource);
+            small = create(min, max);
+            return this;
         }
-        smallCount++;
-        return small.getLong(randomSource);
+
+        public Builder small(long min, long max, TimeUnit unit)
+        {
+            small = create(min, max, unit);
+            return this;
+        }
+
+        public Builder small(long min, TimeUnit minUnit, long max, TimeUnit maxUnit)
+        {
+            small = create(min, minUnit, max, maxUnit);
+            return this;
+        }
+
+        public Builder large(Duration min, Duration max)
+        {
+            large = create(min, max);
+            return this;
+        }
+
+        public Builder large(long min, long max, TimeUnit unit)
+        {
+            large = create(min, max, unit);
+            return this;
+        }
+
+        public Builder large(long min, TimeUnit minUnit, long max, TimeUnit maxUnit)
+        {
+            large = create(min, minUnit, max, maxUnit);
+            return this;
+        }
+
+        private RandomWalkRange create(Duration min, Duration max)
+        {
+            return new RandomWalkRange(random, min.toNanos(), max.toNanos());
+        }
+
+        private RandomWalkRange create(long min, long max, TimeUnit unit)
+        {
+            return create(min, unit, max, unit);
+        }
+
+        private RandomWalkRange create(long min, TimeUnit minUnit, long max, TimeUnit maxUnit)
+        {
+            return new RandomWalkRange(random, minUnit.toNanos(min), maxUnit.toNanos(max));
+        }
+
+        public FrequentLargeRange build()
+        {
+            if (small == null)
+                throw new IllegalStateException("Small range undefined");
+            if (large == null)
+                throw new IllegalStateException("Large range undefined");
+            if (ratio == null)
+                ratio(1, 11);
+            return new FrequentLargeRange(small, large, ratio);
+        }
     }
 }
diff --git a/accord-core/src/test/java/accord/burn/random/IntRange.java b/accord-core/src/test/java/accord/burn/random/IntRange.java
index 5ebc2cd..b9aec72 100644
--- a/accord-core/src/test/java/accord/burn/random/IntRange.java
+++ b/accord-core/src/test/java/accord/burn/random/IntRange.java
@@ -18,9 +18,10 @@
 
 package accord.burn.random;
 
+import accord.utils.Gen;
 import accord.utils.RandomSource;
 
-public class IntRange implements RandomInt
+public class IntRange implements Gen.LongGen
 {
     public final int min, max;
     private final int maxDelta;
@@ -34,7 +35,7 @@
     }
 
     @Override
-    public int getInt(RandomSource randomSource)
+    public long nextLong(RandomSource randomSource)
     {
         return min + randomSource.nextInt(maxDelta);
     }
diff --git a/accord-core/src/test/java/accord/burn/random/RandomInt.java b/accord-core/src/test/java/accord/burn/random/RandomInt.java
deleted file mode 100644
index a8fbbd1..0000000
--- a/accord-core/src/test/java/accord/burn/random/RandomInt.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.burn.random;
-
-import accord.utils.RandomSource;
-
-public interface RandomInt extends RandomLong
-{
-    int getInt(RandomSource randomSource);
-
-    @Override
-    default long getLong(RandomSource randomSource)
-    {
-        return getInt(randomSource);
-    }
-}
diff --git a/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java b/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
index 68c134a..d25be7c 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
+++ b/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
@@ -46,9 +46,9 @@
     {
         int samples = 1000;
         qt().forAll(Gens.random(), ranges()).check((rs, range) -> {
-            RandomLong randRange = factory.create(rs, range.min, range.max);
+            Gen.LongGen randRange = factory.create(rs, range.min, range.max);
             for (int i = 0; i < samples; i++)
-                Assertions.assertThat(randRange.getLong(rs)).isBetween((long) range.min, (long) range.max);
+                Assertions.assertThat(randRange.nextLong(rs)).isBetween((long) range.min, (long) range.max);
         });
     }
 
@@ -78,6 +78,6 @@
 
     private interface Factory
     {
-        RandomLong create(RandomSource random, int min, int max);
+        Gen.LongGen create(RandomSource random, int min, int max);
     }
 }
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java b/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
index c618cf7..ae3c98e 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
+++ b/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
@@ -18,15 +18,16 @@
 
 package accord.burn.random;
 
+import accord.utils.Gen.LongGen;
 import accord.utils.RandomSource;
 
-public class RandomWalkRange implements RandomLong
+public class RandomWalkRange implements LongGen
 {
-    public final int min, max;
-    private final int maxStepSize;
+    public final long min, max;
+    private final long maxStepSize;
     long cur;
 
-    public RandomWalkRange(RandomSource random, int min, int max)
+    public RandomWalkRange(RandomSource random, long min, long max)
     {
         this.min = min;
         this.max = max;
@@ -35,7 +36,7 @@
     }
 
     @Override
-    public long getLong(RandomSource randomSource)
+    public long nextLong(RandomSource randomSource)
     {
         long step = randomSource.nextLong(-maxStepSize, maxStepSize + 1);
         long cur = this.cur;
@@ -44,7 +45,7 @@
         return cur;
     }
 
-    private static int maxStepSize(RandomSource random, int min, int max)
+    private static long maxStepSize(RandomSource random, long min, long max)
     {
         switch (random.nextInt(3))
         {
diff --git a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java b/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
index caddc24..b16cdec 100644
--- a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
+++ b/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
@@ -53,17 +53,17 @@
             this.type = type;
         }
 
-        RandomLong min(RandomSource random)
+        Gen.LongGen min(RandomSource random)
         {
             return create(random, minSmall, maxSmall);
         }
 
-        RandomLong max(RandomSource random)
+        Gen.LongGen max(RandomSource random)
         {
             return create(random, minLarge, maxLarge);
         }
 
-        private RandomLong create(RandomSource random, int min, int max)
+        private Gen.LongGen create(RandomSource random, int min, int max)
         {
             switch (type)
             {
@@ -127,7 +127,7 @@
         int resamples = 0;
         for (int i = 0; i < numSamples; i++)
         {
-            long size = period.getLong(rs);
+            long size = period.nextLong(rs);
             if (size > tc.maxSmall)
             {
                 largeCount++;
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
index bfd83fa..0c9bdea 100644
--- a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
+++ b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
@@ -101,7 +101,7 @@
     {
         System.out.println("seed: " + seed);
         RandomSource random = new DefaultRandom(seed);
-        SimulatedDelayedExecutorService executor = new SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new TestAgent(), random);
+        SimulatedDelayedExecutorService executor = new SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new TestAgent());
         return topologies(random, executor).map(topologies -> constructor.apply(random, topologies))
                 .collect(Collectors.toList());
     }
diff --git a/accord-core/src/test/java/accord/impl/MessageListener.java b/accord-core/src/test/java/accord/impl/MessageListener.java
new file mode 100644
index 0000000..faddb09
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/MessageListener.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.impl.basic.NodeSink;
+import accord.local.Node;
+import accord.local.PreLoadContext;
+import accord.messages.Message;
+import accord.messages.Request;
+import accord.messages.TxnRequest;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+public interface MessageListener
+{
+    enum ClientAction {SUBMIT, SUCCESS, FAILURE, UNKNOWN}
+
+    void onMessage(NodeSink.Action action, Node.Id src, Node.Id to, long id, Message message);
+
+    void onClientAction(ClientAction action, Node.Id from, TxnId id, Object message);
+
+    static MessageListener get()
+    {
+        if (!DebugListener.DEBUG) return Noop.INSTANCE;
+        return new DebugListener();
+    }
+
+    enum Noop implements MessageListener
+    {
+        INSTANCE;
+
+        @Override
+        public void onMessage(NodeSink.Action action, Node.Id src, Node.Id to, long id, Message message)
+        {
+
+        }
+
+        @Override
+        public void onClientAction(ClientAction action, Node.Id from, TxnId id, Object message)
+        {
+
+        }
+    }
+
+    class DebugListener implements MessageListener
+    {
+        private static final Logger logger = LoggerFactory.getLogger(DebugListener.class);
+        /**
+         * When running tests, if this is enabled log events will happen for each matching message type
+         */
+        private static final boolean DEBUG = false;
+        /**
+         * When this set is empty all txn events will be logged, but if only specific txn are desired then this filter will limit the logging to just those events
+         */
+        private static final Set<TxnId> txnIdFilter = ImmutableSet.of();
+        private static final Set<TxnReplyId> txnReplies = new HashSet<>();
+
+        private static int ACTION_SIZE = Stream.of(NodeSink.Action.values()).map(Enum::name).mapToInt(String::length).max().getAsInt();
+        private static int CLIENT_ACTION_SIZE = Stream.of(ClientAction.values()).map(Enum::name).mapToInt(String::length).max().getAsInt();
+        private static int ALL_ACTION_SIZE = Math.max(ACTION_SIZE, CLIENT_ACTION_SIZE);
+
+        @Override
+        public void onMessage(NodeSink.Action action, Node.Id from, Node.Id to, long id, Message message)
+        {
+            if (txnIdFilter.isEmpty() || containsTxnId(from, to, id, message))
+                logger.debug("Message {}: From {}, To {}, id {}, Message {}", normalize(action), normalize(from), normalize(to), normalizeMessageId(id), message);
+        }
+
+        @Override
+        public void onClientAction(ClientAction action, Node.Id from, TxnId id, Object message)
+        {
+            if (txnIdFilter.isEmpty() || txnIdFilter.contains(id))
+            {
+                String log = message instanceof Throwable ?
+                             "Client  {}: From {}, To {}, id {}" :
+                             "Client  {}: From {}, To {}, id {}, Message {}";
+                logger.debug(log, normalize(action), normalize(from), normalize(from), normalize(id), normalizeClientMessage(message));
+            }
+        }
+
+        private static Object normalizeClientMessage(Object o)
+        {
+            if (o instanceof Throwable)
+                trimStackTrace((Throwable) o);
+            return o;
+        }
+
+        private static void trimStackTrace(Throwable input)
+        {
+            for (Throwable current = input; current != null; current = current.getCause())
+            {
+                StackTraceElement[] stack = current.getStackTrace();
+                // remove junit as its super dense and not helpful
+                OptionalInt first = IntStream.range(0, stack.length).filter(i -> stack[i].getClassName().startsWith("org.junit")).findFirst();
+                if (first.isPresent())
+                    current.setStackTrace(Arrays.copyOfRange(stack, 0, first.getAsInt()));
+                for (Throwable sup : current.getSuppressed())
+                    trimStackTrace(sup);
+            }
+        }
+
+        private static String normalize(NodeSink.Action action)
+        {
+            return Strings.padStart(action.name(), ALL_ACTION_SIZE, ' ');
+        }
+
+        private static String normalize(ClientAction action)
+        {
+            return Strings.padStart(action.name(), ALL_ACTION_SIZE, ' ');
+        }
+
+        private static String normalize(Node.Id id)
+        {
+            return Strings.padStart(id.toString(), 4, ' ');
+        }
+
+        private static String normalizeMessageId(long id)
+        {
+            return Strings.padStart(Long.toString(id), 14, ' ');
+        }
+
+        private static String normalize(Timestamp ts)
+        {
+            return Strings.padStart(ts.toString(), 14, ' ');
+        }
+
+        public static boolean containsTxnId(Node.Id from, Node.Id to, long id, Message message)
+        {
+            if (message instanceof Request)
+            {
+                if (containsAny((Request) message))
+                {
+                    txnReplies.add(new TxnReplyId(from, to, id));
+                    return true;
+                }
+                return false;
+            }
+            else
+                return txnReplies.contains(new TxnReplyId(to, from, id));
+        }
+
+        private static boolean containsAny(Request message)
+        {
+            if (message instanceof TxnRequest<?>)
+                return txnIdFilter.contains(((TxnRequest<?>) message).txnId);
+            // this includes txn that depend on the txn, should this limit for the first txnId?
+            if (message instanceof PreLoadContext)
+                return ((PreLoadContext) message).txnIds().stream().anyMatch(txnIdFilter::contains);
+            return false;
+        }
+
+        private static class TxnReplyId
+        {
+            final Node.Id from;
+            final Node.Id to;
+            final long id;
+
+            private TxnReplyId(Node.Id from, Node.Id to, long id)
+            {
+                this.from = from;
+                this.to = to;
+                this.id = id;
+            }
+
+            @Override
+            public boolean equals(Object o)
+            {
+                if (this == o) return true;
+                if (o == null || getClass() != o.getClass()) return false;
+                TxnReplyId that = (TxnReplyId) o;
+                return id == that.id && Objects.equals(from, that.from) && Objects.equals(to, that.to);
+            }
+
+            @Override
+            public int hashCode()
+            {
+                return Objects.hash(from, to, id);
+            }
+
+            @Override
+            public String toString()
+            {
+                return "TxnReplyId{" +
+                       "from=" + from +
+                       ", to=" + to +
+                       ", id=" + id +
+                       '}';
+            }
+        }
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 5dad1de..853dfe7 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -35,6 +35,7 @@
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import accord.impl.MessageListener;
 import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.messages.MessageType;
+import accord.messages.Message;
 import accord.messages.Reply;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
@@ -81,18 +83,24 @@
 
     EnumMap<MessageType, Stats> statsMap = new EnumMap<>(MessageType.class);
 
+    final RandomSource randomSource;
     final Function<Id, Node> lookup;
     final PendingQueue pending;
+    final Runnable checkFailures;
     final List<Runnable> onDone = new ArrayList<>();
     final Consumer<Packet> responseSink;
     final Map<Id, NodeSink> sinks = new HashMap<>();
+    final MessageListener messageListener;
     int clock;
     int recurring;
     Set<Id> partitionSet;
 
-    public Cluster(Supplier<PendingQueue> queueSupplier, Function<Id, Node> lookup, Consumer<Packet> responseSink)
+    public Cluster(RandomSource randomSource, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id, Node> lookup, Consumer<Packet> responseSink)
     {
+        this.randomSource = randomSource;
+        this.messageListener = messageListener;
         this.pending = queueSupplier.get();
+        this.checkFailures = checkFailures;
         this.lookup = lookup;
         this.responseSink = responseSink;
         this.partitionSet = new HashSet<>();
@@ -105,26 +113,16 @@
         return sink;
     }
 
-    private void add(Packet packet)
+    void add(Packet packet, long delay, TimeUnit unit)
     {
         MessageType type = packet.message.type();
         if (type != null)
             statsMap.computeIfAbsent(type, ignore -> new Stats()).count++;
-        boolean isReply = packet.message instanceof Reply;
         if (trace.isTraceEnabled())
-            trace.trace("{} {} {}", clock++, isReply ? "RPLY" : "SEND", packet);
+            trace.trace("{} {} {}", clock++, packet.message instanceof Reply ? "RPLY" : "SEND", packet);
         if (lookup.apply(packet.dst) == null) responseSink.accept(packet);
-        else pending.add(packet);
-    }
+        else                                  pending.add(packet, delay, unit);
 
-    void add(Id from, Id to, long messageId, Request send)
-    {
-        add(new Packet(from, to, messageId, send));
-    }
-
-    void add(Id from, Id to, long replyId, Reply send)
-    {
-        add(new Packet(from, to, replyId, send));
     }
 
     public void processAll()
@@ -139,6 +137,7 @@
 
     public boolean processPending()
     {
+        checkFailures.run();
         if (pending.size() == recurring)
             return false;
 
@@ -147,6 +146,7 @@
             return false;
 
         processNext(next);
+        checkFailures.run();
         return true;
     }
 
@@ -157,18 +157,6 @@
             Packet deliver = (Packet) next;
             Node on = lookup.apply(deliver.dst);
 
-            // TODO (required, testing): random drop chance independent of partition; also port flaky connections etc. from simulator
-            // Drop the message if it goes across the partition
-            boolean drop = ((Packet) next).src.id >= 0 &&
-                           !(partitionSet.contains(deliver.src) && partitionSet.contains(deliver.dst)
-                             || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dst));
-            if (drop)
-            {
-                if (trace.isTraceEnabled())
-                    trace.trace("{} DROP[{}] {}", clock++, on.epoch(), deliver);
-                return;
-            }
-
             if (trace.isTraceEnabled())
                 trace.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver);
 
@@ -190,6 +178,12 @@
         }
     }
 
+    public void notifyDropped(Node.Id from, Node.Id to, long id, Message message)
+    {
+        if (trace.isTraceEnabled())
+            trace.trace("{} DROP[{}] (from:{}, to:{}, {}:{}, body:{})", clock++, lookup.apply(to).epoch(), from, to, message instanceof Reply ? "replyTo" : "id", id, message);
+    }
+
     @Override
     public Scheduled recurring(Runnable run, long delay, TimeUnit units)
     {
@@ -219,13 +213,13 @@
         run.run();
     }
 
-    public static EnumMap<MessageType, Stats> run(Id[] nodes, Supplier<PendingQueue> queueSupplier, Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
+    public static EnumMap<MessageType, Stats> run(Id[] nodes, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
     {
         Topology topology = topologyFactory.toTopology(nodes);
         Map<Id, Node> lookup = new LinkedHashMap<>();
         try
         {
-            Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink);
+            Cluster sinks = new Cluster(randomSupplier.get(), messageListener, queueSupplier, checkFailures, lookup::get, responseSink);
             TopologyUpdates topologyUpdates = new TopologyUpdates(executor);
             TopologyRandomizer configRandomizer = new TopologyRandomizer(randomSupplier, topology, topologyUpdates, lookup::get);
             List<CoordinateDurabilityScheduling> durabilityScheduling = new ArrayList<>();
@@ -248,7 +242,7 @@
             }
 
             // startup
-            AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::start).collect(toList()), (a, b) -> null).beginAsResult();
+            AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult();
             while (sinks.processPending());
             Assertions.assertTrue(startup.isDone());
 
@@ -270,7 +264,7 @@
 
             Packet next;
             while ((next = in.get()) != null)
-                sinks.add(next);
+                sinks.add(next, 0, TimeUnit.NANOSECONDS);
 
             while (sinks.processPending());
 
@@ -289,8 +283,9 @@
 
             while (!sinks.onDone.isEmpty())
             {
-                sinks.onDone.forEach(Runnable::run);
+                List<Runnable> onDone = new ArrayList<>(sinks.onDone);
                 sinks.onDone.clear();
+                onDone.forEach(Runnable::run);
                 while (sinks.processPending());
             }
 
diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index cc026c8..78d76a7 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -49,7 +49,7 @@
     public static CommandStores.Factory factory(PendingQueue pending)
     {
         return (time, agent, store, random, shardDistributor, progressLogFactory) ->
-               new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, new SimulatedDelayedExecutorService(pending, agent, random));
+               new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, new SimulatedDelayedExecutorService(pending, agent));
     }
 
     public static class DelayedCommandStore extends InMemoryCommandStore
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index e14fb52..a19d696 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -18,13 +18,22 @@
 
 package accord.impl.basic;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 import accord.api.MessageSink;
 import accord.local.AgentExecutor;
+import accord.burn.random.FrequentLargeRange;
+import accord.messages.SafeCallback;
+import accord.messages.Message;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.RandomSource;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.messages.Callback;
@@ -32,13 +41,16 @@
 import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
-import accord.messages.SafeCallback;
-import accord.utils.RandomSource;
 
 import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 public class NodeSink implements MessageSink
 {
+    public enum Action {DELIVER, DROP, DROP_PARTITIONED, DELIVER_WITH_FAILURE, FAILURE}
+
+    private final Map<Id, Supplier<Action>> nodeActions = new HashMap<>();
+    private final Map<Id, LongSupplier> networkJitter = new HashMap<>();
     final Id self;
     final Function<Id, Node> lookup;
     final Cluster parent;
@@ -56,9 +68,9 @@
     }
 
     @Override
-    public synchronized void send(Id to, Request send)
+    public void send(Id to, Request send)
     {
-        parent.add(self, to, SENTINEL_MESSAGE_ID, send);
+        maybeEnqueue(to, SENTINEL_MESSAGE_ID, send, null);
     }
 
     @Override
@@ -67,21 +79,116 @@
         long messageId = nextMessageId++;
         SafeCallback sc = new SafeCallback(executor, callback);
         callbacks.put(messageId, sc);
-        parent.add(self, to, messageId, send);
-        parent.pending.add((PendingRunnable) () -> {
-            if (sc == callbacks.get(messageId))
-                sc.slowResponse(to);
-        }, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
-        parent.pending.add((PendingRunnable) () -> {
-            if (sc == callbacks.remove(messageId))
-                sc.timeout(to);
-        }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+        if (maybeEnqueue(to, messageId, send, sc))
+        {
+            parent.pending.add((PendingRunnable) () -> {
+                if (sc == callbacks.get(messageId))
+                    sc.slowResponse(to);
+            }, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
+            parent.pending.add((PendingRunnable) () -> {
+                if (sc == callbacks.remove(messageId))
+                    sc.timeout(to);
+            }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override
     public void reply(Id replyToNode, ReplyContext replyContext, Reply reply)
     {
-        parent.add(self, replyToNode, Packet.getMessageId(replyContext), reply);
+        maybeEnqueue(replyToNode, Packet.getMessageId(replyContext), reply, null);
+    }
+
+    private boolean maybeEnqueue(Node.Id to, long id, Message message, SafeCallback callback)
+    {
+        Runnable task = () -> {
+            Packet packet;
+            if (message instanceof Reply) packet = new Packet(self, to, id, (Reply) message);
+            else                          packet = new Packet(self, to, id, (Request) message);
+            parent.add(packet, networkJitterNanos(to), TimeUnit.NANOSECONDS);
+        };
+        if (to.equals(self) || lookup.apply(to) == null /* client */)
+        {
+            parent.messageListener.onMessage(Action.DELIVER, self, to, id, message);
+            task.run();
+            return true;
+        }
+
+        Action action = partitioned(to) ? Action.DROP_PARTITIONED
+                                        // call actions() per node so each one has different "runs" state
+                                        : nodeActions.computeIfAbsent(to, ignore -> actions()).get();
+        parent.messageListener.onMessage(action, self, to, id, message);
+        switch (action)
+        {
+            case DELIVER:
+                task.run();
+                return true;
+            case DELIVER_WITH_FAILURE:
+                task.run();
+            case FAILURE:
+
+                if (action == Action.FAILURE)
+                    parent.notifyDropped(self, to, id, message);
+                if (callback != null)
+                {
+                    parent.pending.add((PendingRunnable) () -> {
+                        if (callback == callbacks.remove(id))
+                        {
+                            try
+                            {
+                                callback.failure(to, new SimulatedFault("Simulation Failure; src=" + self + ", to=" + to + ", id=" + id + ", message=" + message));
+                            }
+                            catch (Throwable t)
+                            {
+                                callback.onCallbackFailure(to, t);
+                                lookup.apply(self).agent().onUncaughtException(t);
+                            }
+                        }
+                    }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+                }
+                return false;
+            case DROP_PARTITIONED:
+            case DROP:
+                // TODO (consistency): parent.notifyDropped is a trace logger that is very similar in spirit to MessageListener; can we unify?
+                parent.notifyDropped(self, to, id, message);
+                return true;
+            default:
+                throw new AssertionError("Unexpected action: " + action);
+        }
+    }
+
+    private long networkJitterNanos(Node.Id dst)
+    {
+        return networkJitter.computeIfAbsent(dst, ignore -> defaultJitter())
+                            .getAsLong();
+    }
+
+    private LongSupplier defaultJitter()
+    {
+        return FrequentLargeRange.builder(random)
+                                 .ratio(1, 5)
+                                 .small(500, TimeUnit.MICROSECONDS, 5, TimeUnit.MILLISECONDS)
+                                 .large(50, TimeUnit.MILLISECONDS, 5, SECONDS)
+                                 .build()
+                                 .asLongSupplier(random);
+    }
+
+    private boolean partitioned(Id to)
+    {
+        return parent.partitionSet.contains(self) != parent.partitionSet.contains(to);
+    }
+
+    private Supplier<Action> actions()
+    {
+        Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01);
+        Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01);
+        Gen<Action> actionGen = rs -> {
+            if (drops.next(rs))
+                return Action.DROP;
+            return failures.next(rs) ?
+                   rs.nextBoolean() ? Action.FAILURE : Action.DELIVER_WITH_FAILURE
+                   : Action.DELIVER;
+        };
+        return actionGen.asSupplier(random);
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/PendingQueue.java b/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
index e070dd8..fd4932d 100644
--- a/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
@@ -23,6 +23,7 @@
 public interface PendingQueue
 {
     void add(Pending item);
+    void addNoDelay(Pending item);
     void add(Pending item, long delay, TimeUnit units);
     boolean remove(Pending item);
     Pending poll();
diff --git a/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java b/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
index a45b53e..bd25ac8 100644
--- a/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
@@ -39,6 +39,12 @@
     }
 
     @Override
+    public void addNoDelay(Pending item)
+    {
+        wrapped.addNoDelay(item);
+    }
+
+    @Override
     public void add(Pending item, long delay, TimeUnit units)
     {
         wrapped.add(item, delay, units);
@@ -53,6 +59,12 @@
     @Override
     public Pending poll()
     {
+        checkFailures();
+        return wrapped.poll();
+    }
+
+    public void checkFailures()
+    {
         if (!failures.isEmpty())
         {
             AssertionError assertion = null;
@@ -74,8 +86,6 @@
             }
             throw assertion;
         }
-
-        return wrapped.poll();
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
index b0cec4d..85eb236 100644
--- a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
@@ -18,12 +18,14 @@
 
 package accord.impl.basic;
 
+import accord.burn.random.FrequentLargeRange;
+import accord.utils.RandomSource;
+
 import java.util.PriorityQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
-import accord.utils.RandomSource;
-
 public class RandomDelayQueue implements PendingQueue
 {
     public static class Factory implements Supplier<RandomDelayQueue>
@@ -87,25 +89,38 @@
     }
 
     final PriorityQueue<Item> queue = new PriorityQueue<>();
-    final RandomSource random;
+    private final LongSupplier jitterMillis;
     long now;
     int seq;
 
     RandomDelayQueue(RandomSource random)
     {
-        this.random = random;
+        this.jitterMillis = FrequentLargeRange.builder(random)
+                                              .small(0, 50, TimeUnit.MICROSECONDS)
+                                              .large(50, TimeUnit.MICROSECONDS, 5, TimeUnit.MILLISECONDS)
+                                              .build()
+                                              .mapAsLong(TimeUnit.NANOSECONDS::toMillis)
+                                              .asLongSupplier(random);
     }
 
     @Override
     public void add(Pending item)
     {
-        add(item, random.nextInt(500), TimeUnit.MILLISECONDS);
+        add(item, 0, TimeUnit.NANOSECONDS);
+    }
+
+    @Override
+    public void addNoDelay(Pending item)
+    {
+        queue.add(new Item(now, seq++, item));
     }
 
     @Override
     public void add(Pending item, long delay, TimeUnit units)
     {
-        queue.add(new Item(now + units.toMillis(delay), seq++, item));
+        if (delay < 0)
+            throw new IllegalArgumentException("Delay must be positive or 0, but given " + delay);
+        queue.add(new Item(now + units.toMillis(delay) + jitterMillis.getAsLong(), seq++, item));
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
index 2f3bd1b..d1971b5 100644
--- a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -26,10 +26,6 @@
 import java.util.concurrent.TimeUnit;
 
 import accord.api.Agent;
-import accord.burn.random.FrequentLargeRange;
-import accord.burn.random.RandomLong;
-import accord.burn.random.RandomWalkRange;
-import accord.utils.RandomSource;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
@@ -134,42 +130,23 @@
 
     private final PendingQueue pending;
     private final Agent agent;
-    private final RandomSource random;
-    private final RandomLong jitterInNano;
     private long sequenceNumber;
 
-    public SimulatedDelayedExecutorService(PendingQueue pending, Agent agent, RandomSource random)
+    public SimulatedDelayedExecutorService(PendingQueue pending, Agent agent)
     {
         this.pending = pending;
         this.agent = agent;
-        this.random = random;
-        // this is different from Apache Cassandra Simulator as this is computed differently for each executor
-        // rather than being a global config
-        double ratio = random.nextInt(1, 11) / 100.0D;
-        this.jitterInNano = new FrequentLargeRange(new RandomWalkRange(random, microToNanos(0), microToNanos(50)),
-                                                   new RandomWalkRange(random, microToNanos(50), msToNanos(5)),
-                                                   ratio);
-    }
-
-    private static int msToNanos(int value)
-    {
-        return Math.toIntExact(TimeUnit.MILLISECONDS.toNanos(value));
-    }
-
-    private static int microToNanos(int value)
-    {
-        return Math.toIntExact(TimeUnit.MICROSECONDS.toNanos(value));
     }
 
     @Override
     public void execute(Task<?> task)
     {
-        pending.add(task, jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
+        pending.add(task);
     }
 
     private void schedule(Task<?> task, long delay, TimeUnit unit)
     {
-        pending.add(task, unit.toNanos(delay) + jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
+        pending.add(task, delay, unit);
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/burn/random/RandomLong.java b/accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
similarity index 82%
rename from accord-core/src/test/java/accord/burn/random/RandomLong.java
rename to accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
index 1bc8c48..3af4ead 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomLong.java
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
@@ -16,11 +16,12 @@
  * limitations under the License.
  */
 
-package accord.burn.random;
+package accord.impl.basic;
 
-import accord.utils.RandomSource;
-
-public interface RandomLong
+public class SimulatedFault extends AssertionError
 {
-    long getLong(RandomSource randomSource);
+    public SimulatedFault(Object detailMessage)
+    {
+        super(detailMessage);
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 5fb3924..1a9611c 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -68,7 +68,7 @@
     @Override
     public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next)
     {
-        throw new AssertionError("Inconsistent execution timestamp detected for txnId " + command.txnId() + ": " + prev + " != " + next);
+        throw new AssertionError("Inconsistent execution timestamp detected for command " + command + ": " + prev + " != " + next);
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index c6127a3..2f2084a 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -18,18 +18,20 @@
 
 package accord.impl.list;
 
-import java.util.function.BiConsumer;
-
 import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.coordinate.CheckShards;
 import accord.coordinate.CoordinationFailed;
 import accord.coordinate.Invalidated;
 import accord.coordinate.Truncated;
+import accord.coordinate.Timeout;
+import accord.impl.MessageListener;
 import accord.impl.basic.Cluster;
 import accord.impl.basic.Packet;
+import accord.impl.basic.SimulatedFault;
 import accord.local.Node;
 import accord.local.Node.Id;
+import accord.local.SaveStatus;
 import accord.local.Status;
 import accord.messages.CheckStatus.CheckStatusOk;
 import accord.messages.CheckStatus.IncludeInfo;
@@ -40,7 +42,11 @@
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 
+import javax.annotation.Nullable;
+
 import static accord.local.Status.Phase.Cleanup;
+import java.util.function.BiConsumer;
+
 import static accord.local.Status.PreApplied;
 import static accord.local.Status.PreCommitted;
 
@@ -68,6 +74,10 @@
         protected Action checkSufficient(Id from, CheckStatusOk ok)
         {
             ++count;
+            // this method is called for each reply, so if we see a reply where the status is not known, it may be known on others;
+            // once all status are merged, then onDone will apply aditional logic to make sure things are safe.
+            if (ok.saveStatus == SaveStatus.Uninitialised)
+                return Action.ApproveIfQuorum;
             return ok.saveStatus.hasBeen(PreApplied) ? Action.Approve : Action.Reject;
         }
 
@@ -94,74 +104,125 @@
         final Node node;
         final Id client;
         final ReplyContext replyContext;
+        final MessageListener listener;
+        final TxnId id;
         final Txn txn;
 
-        ResultCallback(Node node, Id client, ReplyContext replyContext, Txn txn)
+        ResultCallback(Node node, Id client, ReplyContext replyContext, MessageListener listener, TxnId id, Txn txn)
         {
             this.node = node;
             this.client = client;
             this.replyContext = replyContext;
+            this.listener = listener;
+            this.id = id;
             this.txn = txn;
         }
 
         @Override
         public void accept(Result success, Throwable fail)
         {
-            if (fail instanceof CoordinationFailed)
+            if (fail != null)
             {
-                RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
-                TxnId txnId = ((CoordinationFailed) fail).txnId();
-                if (fail instanceof Invalidated)
+                listener.onClientAction(MessageListener.ClientAction.FAILURE, node.id(), id, fail);
+                if (fail instanceof CoordinationFailed)
                 {
-                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null);
-                    return;
-                }
+                    RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
+                    TxnId txnId = ((CoordinationFailed) fail).txnId();
+                    if (fail instanceof Invalidated)
+                    {
+                        node.reply(client, replyContext, ListResult.invalidated(client, ((Packet)replyContext).requestId, txnId), null);
+                        return;
+                    }
 
-                node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null), null);
-                ((Cluster)node.scheduler()).onDone(() -> {
-                    node.commandStores()
-                        .select(homeKey)
-                        .execute(() -> CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f) -> {
-                            if (f != null)
-                            {
-                                node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f instanceof Truncated ? new int[2][] : new int[3][], null), null);
-                                return;
-                            }
-                            switch (s)
-                            {
-                                case Truncated:
-                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[2][], null), null);
-                                    break;
-                                case Invalidated:
-                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null);
-                                    break;
-                                case Lost:
-                                    node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[1][], null), null);
-                                    break;
-                                case Other:
-                                    // currently caught elsewhere in response tracking, but might help to throw an exception here
-                            }
-                        }));
-                });
+                    node.reply(client, replyContext, ListResult.heartBeat(client, ((Packet)replyContext).requestId, txnId), null);
+                    ((Cluster) node.scheduler()).onDone(() -> checkOnResult(homeKey, txnId, 0, null));
+                }
+                else if (fail instanceof SimulatedFault)
+                {
+                    node.reply(client, replyContext, ListResult.heartBeat(client, ((Packet)replyContext).requestId, id), null);
+                    ((Cluster) node.scheduler()).onDone(() -> checkOnResult(null, id, 0, null));
+                }
+                else
+                {
+                    node.agent().onUncaughtException(fail);
+                }
+            }
+            else if (success != null)
+            {
+                listener.onClientAction(MessageListener.ClientAction.SUCCESS, node.id(), id, success);
+                node.reply(client, replyContext, (ListResult) success, null);
             }
             else
             {
-                node.reply(client, replyContext, (ListResult) success, fail);
+                listener.onClientAction(MessageListener.ClientAction.UNKNOWN, node.id(), id, null);
+                node.agent().onUncaughtException(new NullPointerException("Success and Failure were both null"));
             }
         }
+
+        private void checkOnResult(@Nullable RoutingKey homeKey, TxnId txnId, int attempt, Throwable t) {
+            if (attempt == 3)
+            {
+                node.agent().onUncaughtException(t);
+                return;
+            }
+            if (homeKey == null)
+                homeKey = node.selectRandomHomeKey(txnId);
+            RoutingKey finalHomeKey = homeKey;
+            node.commandStores().select(homeKey).execute(() -> CheckOnResult.checkOnResult(node, txnId, finalHomeKey, (s, f) -> {
+                if (f != null)
+                {
+                    if (f instanceof Truncated)
+                    {
+                        node.reply(client, replyContext, ListResult.truncated(client, ((Packet)replyContext).requestId, txnId), null);
+                        return;
+                    }
+                    if (f instanceof Timeout || f instanceof SimulatedFault) checkOnResult(finalHomeKey, txnId, attempt + 1, f);
+                    else
+                    {
+                        node.reply(client, replyContext, ListResult.failure(client, ((Packet)replyContext).requestId, txnId), null);
+                        node.agent().onUncaughtException(f);
+                    }
+                    return;
+                }
+                switch (s)
+                {
+                    case Truncated:
+                        node.reply(client, replyContext, ListResult.truncated(client, ((Packet)replyContext).requestId, txnId), null);
+                        break;
+                    case Invalidated:
+                        node.reply(client, replyContext, ListResult.invalidated(client, ((Packet)replyContext).requestId, txnId), null);
+                        break;
+                    case Lost:
+                        node.reply(client, replyContext, ListResult.lost(client, ((Packet)replyContext).requestId, txnId), null);
+                        break;
+                    case Other:
+                        node.reply(client, replyContext, ListResult.other(client, ((Packet)replyContext).requestId, txnId), null);
+                        break;
+                    default:
+                        node.agent().onUncaughtException(new AssertionError("Unknown outcome: " + s));
+                }
+            }));
+        }
     }
 
     public final Txn txn;
+    private final MessageListener listener;
+    private TxnId id;
 
-    public ListRequest(Txn txn)
+    public ListRequest(Txn txn, MessageListener listener)
     {
         this.txn = txn;
+        this.listener = listener;
     }
 
     @Override
     public void process(Node node, Id client, ReplyContext replyContext)
     {
-        node.coordinate(txn).addCallback(new ResultCallback(node, client, replyContext, txn));
+        if (id != null)
+            throw new IllegalStateException("Called process multiple times");
+        id = node.nextTxnId(txn.kind(), txn.keys().domain());
+        listener.onClientAction(MessageListener.ClientAction.SUBMIT, node.id(), id, txn);
+        node.coordinate(id, txn).addCallback(new ResultCallback(node, client, replyContext, listener, id, txn));
     }
 
     @Override
@@ -173,7 +234,7 @@
     @Override
     public String toString()
     {
-        return txn.toString();
+        return id == null ? txn.toString() : id + " -> " + txn;
     }
 
 }
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java b/accord-core/src/test/java/accord/impl/list/ListResult.java
index dc3311b..88ffeb2 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -32,6 +32,7 @@
 
 public class ListResult implements Result, Reply
 {
+    public enum Fault { HeartBeat, Invalidated, Lost, Other, Truncated, Failure }
     public final Id client;
     public final long requestId;
     public final TxnId txnId;
@@ -39,6 +40,7 @@
     public final Keys responseKeys;
     public final int[][] read; // equal in size to keys.size()
     public final ListUpdate update;
+    private final Fault fault;
 
     public ListResult(Id client, long requestId, TxnId txnId, Seekables<?, ?> readKeys, Keys responseKeys, int[][] read, ListUpdate update)
     {
@@ -49,6 +51,49 @@
         this.responseKeys = responseKeys;
         this.read = read;
         this.update = update;
+        this.fault = null;
+    }
+
+    private ListResult(Id client, long requestId, TxnId txnId, Fault fault)
+    {
+        this.client = client;
+        this.requestId = requestId;
+        this.txnId = txnId;
+        this.readKeys = null;
+        this.responseKeys = null;
+        this.read = null;
+        this.update = null;
+        this.fault = fault;
+    }
+
+    public static ListResult heartBeat(Id client, long requestId, TxnId txnId)
+    {
+        return new ListResult(client, requestId, txnId, Fault.HeartBeat);
+    }
+
+    public static ListResult invalidated(Id client, long requestId, TxnId txnId)
+    {
+        return new ListResult(client, requestId, txnId, Fault.Invalidated);
+    }
+
+    public static ListResult lost(Id client, long requestId, TxnId txnId)
+    {
+        return new ListResult(client, requestId, txnId, Fault.Lost);
+    }
+
+    public static ListResult other(Id client, long requestId, TxnId txnId)
+    {
+        return new ListResult(client, requestId, txnId, Fault.Other);
+    }
+
+    public static ListResult truncated(Id client, long requestId, TxnId txnId)
+    {
+        return new ListResult(client, requestId, txnId, Fault.Truncated);
+    }
+
+    public static ListResult failure(Id client, long requestId, TxnId txnId)
+    {
+        return new ListResult(client, requestId, txnId, Fault.Failure);
     }
 
     @Override
@@ -57,6 +102,18 @@
         return null;
     }
 
+    public boolean isSuccess()
+    {
+        return fault == null;
+    }
+
+    public Fault fault()
+    {
+        if (isSuccess())
+            throw new IllegalStateException("Unable to find fault with successful results");
+        return fault;
+    }
+
     @Override
     public String toString()
     {
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 8c1996a..4d04d51 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -131,7 +131,7 @@
                              SizeOfIntersectionSorter.SUPPLIER,
                              SimpleProgressLog::new,
                              InMemoryCommandStores.SingleThread::new);
-        awaitUninterruptibly(node.start());
+        awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(topology, true);
         return node;
     }
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 879dec7..365ceec 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -113,7 +113,7 @@
                         clock, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
                         () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
                         SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
-        awaitUninterruptibly(node.start());
+        awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(storeSupport.local.get(), true);
         return node;
     }
diff --git a/accord-core/src/test/java/accord/utils/Gen.java b/accord-core/src/test/java/accord/utils/Gen.java
index 62da8c5..af86340 100644
--- a/accord-core/src/test/java/accord/utils/Gen.java
+++ b/accord-core/src/test/java/accord/utils/Gen.java
@@ -21,8 +21,18 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.IntPredicate;
+import java.util.function.IntSupplier;
+import java.util.function.IntUnaryOperator;
 import java.util.function.LongPredicate;
+import java.util.function.LongSupplier;
+import java.util.function.LongUnaryOperator;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
 
 public interface Gen<A> {
     /**
@@ -46,6 +56,16 @@
         return r -> fn.apply(r, this.next(r));
     }
 
+    default IntGen mapToInt(ToIntFunction<A> fn)
+    {
+        return r -> fn.applyAsInt(next(r));
+    }
+
+    default LongGen mapToLong(ToLongFunction<A> fn)
+    {
+        return r -> fn.applyAsLong(next(r));
+    }
+
     default <B> Gen<B> flatMap(Function<? super A, Gen<? extends B>> mapper)
     {
         return rs -> mapper.apply(this.next(rs)).next(rs);
@@ -69,6 +89,16 @@
         };
     }
 
+    default Supplier<A> asSupplier(RandomSource rs)
+    {
+        return () -> next(rs);
+    }
+
+    default Stream<A> asStream(RandomSource rs)
+    {
+        return Stream.generate(() -> next(rs));
+    }
+
     interface IntGen extends Gen<Integer>
     {
         int nextInt(RandomSource random);
@@ -79,7 +109,12 @@
             return nextInt(random);
         }
 
-        default Gen.IntGen filterInt(IntPredicate fn)
+        default IntGen mapAsInt(IntUnaryOperator fn)
+        {
+            return r -> fn.applyAsInt(nextInt(r));
+        }
+
+        default Gen.IntGen filterAsInt(IntPredicate fn)
         {
             return rs -> {
                 int value;
@@ -95,7 +130,17 @@
         @Override
         default Gen.IntGen filter(Predicate<Integer> fn)
         {
-            return filterInt(i -> fn.test(i));
+            return filterAsInt(i -> fn.test(i));
+        }
+
+        default IntSupplier asIntSupplier(RandomSource rs)
+        {
+            return () -> nextInt(rs);
+        }
+
+        default IntStream asIntStream(RandomSource rs)
+        {
+            return IntStream.generate(() -> nextInt(rs));
         }
     }
 
@@ -109,7 +154,12 @@
             return nextLong(random);
         }
 
-        default Gen.LongGen filterLong(LongPredicate fn)
+        default LongGen mapAsLong(LongUnaryOperator fn)
+        {
+            return r -> fn.applyAsLong(nextLong(r));
+        }
+
+        default Gen.LongGen filterAsLong(LongPredicate fn)
         {
             return rs -> {
                 long value;
@@ -125,7 +175,17 @@
         @Override
         default Gen.LongGen filter(Predicate<Long> fn)
         {
-            return filterLong(i -> fn.test(i));
+            return filterAsLong(i -> fn.test(i));
+        }
+
+        default LongSupplier asLongSupplier(RandomSource rs)
+        {
+            return () -> nextLong(rs);
+        }
+
+        default LongStream asLongStream(RandomSource rs)
+        {
+            return LongStream.generate(() -> nextLong(rs));
         }
     }
 }
diff --git a/accord-core/src/test/java/accord/utils/GenTest.java b/accord-core/src/test/java/accord/utils/GenTest.java
index c722335..7a04669 100644
--- a/accord-core/src/test/java/accord/utils/GenTest.java
+++ b/accord-core/src/test/java/accord/utils/GenTest.java
@@ -18,19 +18,25 @@
 
 package accord.utils;
 
+import org.agrona.collections.IntArrayList;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.List;
+import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
 import static accord.utils.Property.qt;
+import static org.assertj.core.api.Assertions.assertThat;
 
 public class GenTest {
     @Test
@@ -97,4 +103,74 @@
             });
         });
     }
+
+    enum PickWeight {A, B, C}
+
+    @Test
+    public void pickWeight()
+    {
+        int samples = 1000;
+        Gen<PickWeight> enums = Gens.enums().allWithWeights(PickWeight.class, 81, 10, 1);
+        Gen<Map<PickWeight, Integer>> gen = rs -> {
+            Map<PickWeight, Integer> counts = new EnumMap<>(PickWeight.class);
+            for (int i = 0; i < samples; i++)
+                counts.compute(enums.next(rs), (ignore, accum) -> accum == null ? 1 : accum + 1);
+            return counts;
+        };
+        qt().forAll(gen).check(counts -> {
+            // expected 810
+            assertThat(counts.get(PickWeight.A)).isGreaterThan(counts.get(PickWeight.B));
+            // expected 100
+            assertThat(counts.get(PickWeight.B))
+                    .isBetween(50, 200);
+
+            if (counts.containsKey(PickWeight.C))
+            {
+                assertThat(counts.get(PickWeight.B))
+                        .isGreaterThan(counts.get(PickWeight.C));
+
+                // expected 10
+                assertThat(counts.get(PickWeight.C))
+                        .isBetween(1, 60);
+            }
+        });
+    }
+
+    @Test
+    public void runs()
+    {
+        double ratio = 0.0625;
+        int samples = 1000;
+        Gen<Runs> gen = Gens.lists(Gens.bools().biasedRepeatingRuns(ratio)).ofSize(samples).map(Runs::new);
+        qt().forAll(gen).check(runs -> {
+            assertThat(IntStream.of(runs.runs).filter(i -> i > 5).toArray()).isNotEmpty();
+            assertThat(runs.counts.get(true) / 1000.0).isBetween(ratio * .5, 0.1);
+        });
+    }
+
+    private static class Runs
+    {
+        private final Map<Boolean, Long> counts;
+        private final int[] runs;
+
+        Runs(List<Boolean> samples)
+        {
+            this.counts = samples.stream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
+            IntArrayList runs = new IntArrayList();
+            int run = -1;
+            for (boolean b : samples)
+            {
+                if (b)
+                {
+                    run = run == -1 ? 1 : run + 1;
+                }
+                else if (run != -1)
+                {
+                    runs.add(run);
+                    run = -1;
+                }
+            }
+            this.runs = runs.toIntArray();
+        }
+    }
 }
diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java
index 4ccda46..244cd64 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -170,7 +170,7 @@
             return RandomSource::nextBoolean;
         }
 
-        public Gen<Boolean> runs(double ratio)
+        public Gen<Boolean> biasedRepeatingRuns(double ratio)
         {
             Invariants.checkArgument(ratio > 0 && ratio <= 1, "Expected %d to be larger than 0 and <= 1", ratio);
             int steps = (int) (1 / ratio);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index cd7741b..f988703 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -321,7 +321,7 @@
                                           SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
             }
 
-            AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::start).collect(toList()), (a, b) -> null).beginAsResult();
+            AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult();
             while (sinks.processPending());
             if (!startup.isDone()) throw new AssertionError();
 
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 1600e03..a191904 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -177,7 +177,7 @@
                           MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                           MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
                           SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
-            awaitUninterruptibly(on.start());
+            awaitUninterruptibly(on.unsafeStart());
             err.println("Initialized node " + init.self);
             err.flush();
             sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index f77a0c0..109dd72 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -49,6 +49,10 @@
     jvmArgs += ['-XX:+HeapDumpOnOutOfMemoryError', "-XX:HeapDumpPath=${buildDir}"]
 }
 
+tasks.withType(Test) {
+  jvmArgs '-XX:+HeapDumpOnOutOfMemoryError'
+}
+
 dependencies {
     testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
     testImplementation 'org.junit.jupiter:junit-jupiter-params:5.7.0'