CEP-15: (C*) Improve the chaos generation for Burn Tests: slow/flakey connections and dropped messages (#57)
patch by David Capwell; reviewed by Benedict Elliott Smith for CASSANDRA-18451
diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index d8ebab8..d33c123 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -269,6 +269,13 @@
}
long lastAcked = epochs.lastAcknowledged;
+ // TODO (now, review): lastAcked == 0, lastReceived = 2
+ // if we wait for epoch=1.acknowledge the test seems to wait forever... looks like burn test doesn't ack epoch=1
+ if (lastAcked == 0 && lastReceived > 0)
+ {
+ epochs.acknowledgeFuture(epochs.minEpoch()).addCallback(() -> reportTopology(topology, startSync));
+ return;
+ }
if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
{
epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology, startSync));
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 1916f0b..276f631 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -169,10 +169,18 @@
configService.registerListener(this);
}
- // TODO (cleanup, testing): remove, only used by Maelstrom
- public AsyncResult<Void> start()
+ /**
+ * This starts the node for tests and makes sure that the provided topology is acknowledged correctly. This method is not
+ * safe for production systems as it doesn't handle restarts and partially acknowledged histories
+ * @return {@link EpochReady#metadata}
+ */
+ @VisibleForTesting
+ public AsyncResult<Void> unsafeStart()
{
- return onTopologyUpdateInternal(configService.currentTopology(), false).metadata;
+ EpochReady ready = onTopologyUpdateInternal(configService.currentTopology(), false);
+ ready.coordination.addCallback(() -> this.topology.onEpochSyncComplete(id, topology.epoch()));
+ configService.acknowledgeEpoch(ready, false);
+ return ready.metadata;
}
public CommandStores commandStores()
diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java b/accord-core/src/main/java/accord/local/PreLoadContext.java
index cd569ac..0ef8f55 100644
--- a/accord-core/src/main/java/accord/local/PreLoadContext.java
+++ b/accord-core/src/main/java/accord/local/PreLoadContext.java
@@ -23,12 +23,15 @@
import accord.primitives.Keys;
import accord.primitives.Seekables;
import accord.primitives.TxnId;
+import com.google.common.collect.Iterators;
import net.nicoulaj.compilecommand.annotations.Inline;
import com.google.common.collect.Sets;
+import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Nullable;
@@ -52,6 +55,28 @@
*/
default Collection<TxnId> additionalTxnIds() { return Collections.emptyList(); }
+ default Collection<TxnId> txnIds()
+ {
+ TxnId primaryTxnId = primaryTxnId();
+ Collection<TxnId> additional = additionalTxnIds();
+ if (primaryTxnId == null) return additional;
+ if (additional.isEmpty()) return Collections.singleton(primaryTxnId);
+ return new AbstractCollection<TxnId>()
+ {
+ @Override
+ public Iterator<TxnId> iterator()
+ {
+ return Iterators.concat(Iterators.singletonIterator(primaryTxnId), additional.iterator());
+ }
+
+ @Override
+ public int size()
+ {
+ return 1 + additional.size();
+ }
+ };
+ }
+
@Inline
default void forEachId(Consumer<TxnId> consumer)
{
diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java
index 1554e5d..44abe4f 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -79,7 +79,7 @@
}
@Override
- public synchronized AcceptReply apply(SafeCommandStore safeStore)
+ public AcceptReply apply(SafeCommandStore safeStore)
{
// TODO (now): we previously checked isAffectedByBootstrap(txnId) here and took this branch also, try to remember why
if (minUnsyncedEpoch < txnId.epoch())
diff --git a/accord-core/src/main/java/accord/messages/SafeCallback.java b/accord-core/src/main/java/accord/messages/SafeCallback.java
index a8a1416..0f3cd3a 100644
--- a/accord-core/src/main/java/accord/messages/SafeCallback.java
+++ b/accord-core/src/main/java/accord/messages/SafeCallback.java
@@ -55,6 +55,11 @@
failure(to, new Timeout(null, null));
}
+ public void onCallbackFailure(Node.Id to, Throwable t)
+ {
+ safeCall(to, t, Callback::onCallbackFailure);
+ }
+
private interface SafeCall<T, P>
{
void accept(Callback<T> callback, Node.Id id, P param) throws Throwable;
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 256e5f8..ec1e81e 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -152,7 +152,7 @@
SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
InMemoryCommandStores.Synchronized::new);
- awaitUninterruptibly(node.start());
+ awaitUninterruptibly(node.unsafeStart());
return node;
}
}
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index ca63eb0..d8bb0ae 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -40,7 +40,10 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
+import accord.burn.random.FrequentLargeRange;
+import accord.impl.MessageListener;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +56,6 @@
import accord.impl.basic.RandomDelayQueue.Factory;
import accord.impl.TopologyFactory;
import accord.impl.basic.Packet;
-import accord.impl.basic.PendingQueue;
import accord.impl.basic.SimulatedDelayedExecutorService;
import accord.impl.list.ListAgent;
import accord.impl.list.ListQuery;
@@ -80,7 +82,7 @@
{
private static final Logger logger = LoggerFactory.getLogger(BurnTest.class);
- static List<Packet> generate(RandomSource random, Function<? super CommandStore, AsyncExecutor> executor, List<Id> clients, List<Id> nodes, int keyCount, int operations)
+ static List<Packet> generate(RandomSource random, MessageListener listener, Function<? super CommandStore, AsyncExecutor> executor, List<Id> clients, List<Id> nodes, int keyCount, int operations)
{
List<Key> keys = new ArrayList<>();
for (int i = 0 ; i < keyCount ; ++i)
@@ -108,7 +110,7 @@
Ranges ranges = Ranges.of(requestRanges.toArray(new Range[0]));
ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, ranges, ranges);
ListQuery query = new ListQuery(client, count);
- ListRequest request = new ListRequest(new Txn.InMemory(ranges, read, query, null));
+ ListRequest request = new ListRequest(new Txn.InMemory(ranges, read, query, null), listener);
packets.add(new Packet(client, node, count, request));
@@ -135,7 +137,7 @@
requestKeys.addAll(update.keySet());
ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, readKeys, new Keys(requestKeys));
ListQuery query = new ListQuery(client, count);
- ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update));
+ ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update), listener);
packets.add(new Packet(client, node, count, request));
}
}
@@ -192,7 +194,7 @@
{
List<Throwable> failures = Collections.synchronizedList(new ArrayList<>());
RandomDelayQueue delayQueue = new Factory(random).get();
- PendingQueue queue = new PropagatingPendingQueue(failures, delayQueue);
+ PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, delayQueue);
RandomSource retryRandom = random.fork();
ListAgent agent = new ListAgent(1000L, failures::add, retry -> {
long delay = retryRandom.nextInt(1, 15);
@@ -200,16 +202,23 @@
});
Supplier<LongSupplier> nowSupplier = () -> {
- RandomSource jitter = random.fork();
- // TODO (expected): jitter should be random walk with intermittent long periods
- return () -> Math.max(0, delayQueue.nowInMillis() + (jitter.nextInt(10) - 5));
+ RandomSource forked = random.fork();
+ return FrequentLargeRange.builder(forked)
+ .ratio(1, 5)
+ .small(50, 5000, TimeUnit.MICROSECONDS)
+ .large(1, 10, TimeUnit.MILLISECONDS)
+ .build()
+ .mapAsLong(j -> Math.max(0, queue.nowInMillis() + j))
+ .asLongSupplier(forked);
};
StrictSerializabilityVerifier strictSerializable = new StrictSerializabilityVerifier(keyCount);
- SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, agent, random.fork());
+ SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, agent);
Function<CommandStore, AsyncExecutor> executor = ignore -> globalExecutor;
- Packet[] requests = toArray(generate(random, executor, clients, nodes, keyCount, operations), Packet[]::new);
+ MessageListener listener = MessageListener.get();
+
+ Packet[] requests = toArray(generate(random, listener, executor, clients, nodes, keyCount, operations), Packet[]::new);
int[] starts = new int[requests.length];
Packet[] replies = new Packet[requests.length];
@@ -239,14 +248,14 @@
{
int i = requestIndex.getAndIncrement();
starts[i] = clock.incrementAndGet();
- queue.add(requests[i]);
+ queue.addNoDelay(requests[i]);
if (i == requests.length - 1)
onSubmitted.get().run();
}
try
{
- if (reply.responseKeys == null && reply.read != null && reply.read.length == 0)
+ if (!reply.isSuccess() && reply.fault() == ListResult.Fault.HeartBeat)
return; // interrupted; will fetch our actual reply once rest of simulation is finished (but wanted to send another request to keep correct number in flight)
int start = starts[(int)packet.replyId];
@@ -254,13 +263,19 @@
logger.debug("{} at [{}, {}]", reply, start, end);
replies[(int)packet.replyId] = packet;
- if (reply.responseKeys == null)
+ if (!reply.isSuccess())
{
- if (reply.read == null) nacks.incrementAndGet();
- else if (reply.read.length == 1) lost.incrementAndGet();
- else if (reply.read.length == 2) truncated.incrementAndGet();
- else if (reply.read.length == 3) failedToCheck.incrementAndGet();
- else throw new AssertionError();
+
+ switch (reply.fault())
+ {
+ case Lost: lost.incrementAndGet(); break;
+ case Invalidated: nacks.incrementAndGet(); break;
+ case Failure: failedToCheck.incrementAndGet(); break;
+ case Truncated: truncated.incrementAndGet(); break;
+ // txn was applied?, but client saw a timeout, so response isn't known
+ case Other: break;
+ default: throw new AssertionError("Unexpected fault: " + reply.fault());
+ }
return;
}
@@ -292,7 +307,7 @@
EnumMap<MessageType, Cluster.Stats> messageStatsMap;
try
{
- messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), () -> queue,
+ messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), listener, () -> queue, queue::checkFailures,
responseSink, globalExecutor,
random::fork, nowSupplier,
topologyFactory, initialRequests::poll,
@@ -313,12 +328,18 @@
logger.info("Message counts: {}", messageStatsMap.entrySet());
if (clock.get() != operations * 2)
{
+ StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < requests.length ; ++i)
{
- logger.info("{}", requests[i]);
- logger.info("\t\t" + replies[i]);
+ // since this only happens when operations are lost, only log the ones without a reply to lower the amount of noise
+ if (replies[i] == null)
+ {
+ sb.setLength(0);
+ sb.append(requests[i]).append("\n\t\t").append(replies[i]);
+ logger.info(sb.toString());
+ }
}
- throw new AssertionError("Incomplete set of responses");
+ throw new AssertionError("Incomplete set of responses; clock=" + clock.get() + ", expected operations=" + (operations * 2));
}
}
@@ -355,6 +376,7 @@
}
@Test
+ @Timeout(value = 3, unit = TimeUnit.MINUTES)
public void testOne()
{
run(ThreadLocalRandom.current().nextLong(), 1000);
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index b196023..ae3d3f0 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -61,6 +61,9 @@
public synchronized void syncComplete(Node originator, Collection<Node.Id> cluster, long epoch)
{
+ // topology is init topology
+ if (pendingTopologies.isEmpty())
+ return;
Map<Node.Id, Ranges> pending = pendingTopologies.get(epoch);
if (pending == null || null == pending.remove(originator.id()))
throw new AssertionError();
diff --git a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
index 4241798..41cae46 100644
--- a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
+++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
@@ -18,59 +18,121 @@
package accord.burn.random;
-import accord.utils.Invariants;
+import accord.utils.Gen;
+import accord.utils.Gen.LongGen;
+import accord.utils.Gens;
import accord.utils.RandomSource;
-public class FrequentLargeRange implements RandomLong
-{
- private final RandomLong small, large;
- private final double ratio;
- private final int steps;
- private final double lower, upper;
- private int run = -1;
- private long smallCount = 0, largeCount = 0;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
- public FrequentLargeRange(RandomLong small, RandomLong large, double ratio)
+public class FrequentLargeRange implements LongGen
+{
+ private final LongGen small, large;
+ private final Gen<Boolean> runs;
+
+ public FrequentLargeRange(LongGen small, LongGen large, double ratio)
{
- Invariants.checkArgument(ratio > 0 && ratio <= 1);
this.small = small;
this.large = large;
- this.ratio = ratio;
- this.steps = (int) (1 / ratio);
- this.lower = ratio * .8;
- this.upper = ratio * 1.2;
+ this.runs = Gens.bools().biasedRepeatingRuns(ratio);
}
@Override
- public long getLong(RandomSource randomSource)
+ public long nextLong(RandomSource randomSource)
{
- if (run != -1)
+ if (runs.next(randomSource)) return large.nextLong(randomSource);
+ else return small.nextLong(randomSource);
+ }
+
+ public static Builder builder(RandomSource randomSource)
+ {
+ return new Builder(randomSource);
+ }
+
+ public static class Builder
+ {
+ private final RandomSource random;
+ private Double ratio;
+ private LongGen small, large;
+
+ public Builder(RandomSource random)
{
- run--;
- largeCount++;
- return large.getLong(randomSource);
+ this.random = random;
}
- double currentRatio = largeCount / (double) (smallCount + largeCount);
- if (currentRatio < lower)
+
+ public Builder ratio(double ratio)
{
- // not enough large
- largeCount++;
- return large.getLong(randomSource);
+ this.ratio = ratio;
+ return this;
}
- if (currentRatio > upper)
+
+ public Builder ratio(int min, int max)
{
- // not enough small
- smallCount++;
- return small.getLong(randomSource);
+ this.ratio = random.nextInt(min, max) / 100.0D;
+ return this;
}
- if (randomSource.nextDouble() < ratio)
+
+ public Builder small(Duration min, Duration max)
{
- run = randomSource.nextInt(steps);
- run--;
- largeCount++;
- return large.getLong(randomSource);
+ small = create(min, max);
+ return this;
}
- smallCount++;
- return small.getLong(randomSource);
+
+ public Builder small(long min, long max, TimeUnit unit)
+ {
+ small = create(min, max, unit);
+ return this;
+ }
+
+ public Builder small(long min, TimeUnit minUnit, long max, TimeUnit maxUnit)
+ {
+ small = create(min, minUnit, max, maxUnit);
+ return this;
+ }
+
+ public Builder large(Duration min, Duration max)
+ {
+ large = create(min, max);
+ return this;
+ }
+
+ public Builder large(long min, long max, TimeUnit unit)
+ {
+ large = create(min, max, unit);
+ return this;
+ }
+
+ public Builder large(long min, TimeUnit minUnit, long max, TimeUnit maxUnit)
+ {
+ large = create(min, minUnit, max, maxUnit);
+ return this;
+ }
+
+ private RandomWalkRange create(Duration min, Duration max)
+ {
+ return new RandomWalkRange(random, min.toNanos(), max.toNanos());
+ }
+
+ private RandomWalkRange create(long min, long max, TimeUnit unit)
+ {
+ return create(min, unit, max, unit);
+ }
+
+ private RandomWalkRange create(long min, TimeUnit minUnit, long max, TimeUnit maxUnit)
+ {
+ return new RandomWalkRange(random, minUnit.toNanos(min), maxUnit.toNanos(max));
+ }
+
+ public FrequentLargeRange build()
+ {
+ if (small == null)
+ throw new IllegalStateException("Small range undefined");
+ if (large == null)
+ throw new IllegalStateException("Large range undefined");
+ if (ratio == null)
+ ratio(1, 11);
+ return new FrequentLargeRange(small, large, ratio);
+ }
}
}
diff --git a/accord-core/src/test/java/accord/burn/random/IntRange.java b/accord-core/src/test/java/accord/burn/random/IntRange.java
index 5ebc2cd..b9aec72 100644
--- a/accord-core/src/test/java/accord/burn/random/IntRange.java
+++ b/accord-core/src/test/java/accord/burn/random/IntRange.java
@@ -18,9 +18,10 @@
package accord.burn.random;
+import accord.utils.Gen;
import accord.utils.RandomSource;
-public class IntRange implements RandomInt
+public class IntRange implements Gen.LongGen
{
public final int min, max;
private final int maxDelta;
@@ -34,7 +35,7 @@
}
@Override
- public int getInt(RandomSource randomSource)
+ public long nextLong(RandomSource randomSource)
{
return min + randomSource.nextInt(maxDelta);
}
diff --git a/accord-core/src/test/java/accord/burn/random/RandomInt.java b/accord-core/src/test/java/accord/burn/random/RandomInt.java
deleted file mode 100644
index a8fbbd1..0000000
--- a/accord-core/src/test/java/accord/burn/random/RandomInt.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.burn.random;
-
-import accord.utils.RandomSource;
-
-public interface RandomInt extends RandomLong
-{
- int getInt(RandomSource randomSource);
-
- @Override
- default long getLong(RandomSource randomSource)
- {
- return getInt(randomSource);
- }
-}
diff --git a/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java b/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
index 68c134a..d25be7c 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
+++ b/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
@@ -46,9 +46,9 @@
{
int samples = 1000;
qt().forAll(Gens.random(), ranges()).check((rs, range) -> {
- RandomLong randRange = factory.create(rs, range.min, range.max);
+ Gen.LongGen randRange = factory.create(rs, range.min, range.max);
for (int i = 0; i < samples; i++)
- Assertions.assertThat(randRange.getLong(rs)).isBetween((long) range.min, (long) range.max);
+ Assertions.assertThat(randRange.nextLong(rs)).isBetween((long) range.min, (long) range.max);
});
}
@@ -78,6 +78,6 @@
private interface Factory
{
- RandomLong create(RandomSource random, int min, int max);
+ Gen.LongGen create(RandomSource random, int min, int max);
}
}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java b/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
index c618cf7..ae3c98e 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
+++ b/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
@@ -18,15 +18,16 @@
package accord.burn.random;
+import accord.utils.Gen.LongGen;
import accord.utils.RandomSource;
-public class RandomWalkRange implements RandomLong
+public class RandomWalkRange implements LongGen
{
- public final int min, max;
- private final int maxStepSize;
+ public final long min, max;
+ private final long maxStepSize;
long cur;
- public RandomWalkRange(RandomSource random, int min, int max)
+ public RandomWalkRange(RandomSource random, long min, long max)
{
this.min = min;
this.max = max;
@@ -35,7 +36,7 @@
}
@Override
- public long getLong(RandomSource randomSource)
+ public long nextLong(RandomSource randomSource)
{
long step = randomSource.nextLong(-maxStepSize, maxStepSize + 1);
long cur = this.cur;
@@ -44,7 +45,7 @@
return cur;
}
- private static int maxStepSize(RandomSource random, int min, int max)
+ private static long maxStepSize(RandomSource random, long min, long max)
{
switch (random.nextInt(3))
{
diff --git a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java b/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
index caddc24..b16cdec 100644
--- a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
+++ b/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
@@ -53,17 +53,17 @@
this.type = type;
}
- RandomLong min(RandomSource random)
+ Gen.LongGen min(RandomSource random)
{
return create(random, minSmall, maxSmall);
}
- RandomLong max(RandomSource random)
+ Gen.LongGen max(RandomSource random)
{
return create(random, minLarge, maxLarge);
}
- private RandomLong create(RandomSource random, int min, int max)
+ private Gen.LongGen create(RandomSource random, int min, int max)
{
switch (type)
{
@@ -127,7 +127,7 @@
int resamples = 0;
for (int i = 0; i < numSamples; i++)
{
- long size = period.getLong(rs);
+ long size = period.nextLong(rs);
if (size > tc.maxSmall)
{
largeCount++;
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
index bfd83fa..0c9bdea 100644
--- a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
+++ b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
@@ -101,7 +101,7 @@
{
System.out.println("seed: " + seed);
RandomSource random = new DefaultRandom(seed);
- SimulatedDelayedExecutorService executor = new SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new TestAgent(), random);
+ SimulatedDelayedExecutorService executor = new SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new TestAgent());
return topologies(random, executor).map(topologies -> constructor.apply(random, topologies))
.collect(Collectors.toList());
}
diff --git a/accord-core/src/test/java/accord/impl/MessageListener.java b/accord-core/src/test/java/accord/impl/MessageListener.java
new file mode 100644
index 0000000..faddb09
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/MessageListener.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.impl.basic.NodeSink;
+import accord.local.Node;
+import accord.local.PreLoadContext;
+import accord.messages.Message;
+import accord.messages.Request;
+import accord.messages.TxnRequest;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+public interface MessageListener
+{
+ enum ClientAction {SUBMIT, SUCCESS, FAILURE, UNKNOWN}
+
+ void onMessage(NodeSink.Action action, Node.Id src, Node.Id to, long id, Message message);
+
+ void onClientAction(ClientAction action, Node.Id from, TxnId id, Object message);
+
+ static MessageListener get()
+ {
+ if (!DebugListener.DEBUG) return Noop.INSTANCE;
+ return new DebugListener();
+ }
+
+ enum Noop implements MessageListener
+ {
+ INSTANCE;
+
+ @Override
+ public void onMessage(NodeSink.Action action, Node.Id src, Node.Id to, long id, Message message)
+ {
+
+ }
+
+ @Override
+ public void onClientAction(ClientAction action, Node.Id from, TxnId id, Object message)
+ {
+
+ }
+ }
+
+ class DebugListener implements MessageListener
+ {
+ private static final Logger logger = LoggerFactory.getLogger(DebugListener.class);
+ /**
+ * When running tests, if this is enabled log events will happen for each matching message type
+ */
+ private static final boolean DEBUG = false;
+ /**
+ * When this set is empty all txn events will be logged, but if only specific txn are desired then this filter will limit the logging to just those events
+ */
+ private static final Set<TxnId> txnIdFilter = ImmutableSet.of();
+ private static final Set<TxnReplyId> txnReplies = new HashSet<>();
+
+ private static int ACTION_SIZE = Stream.of(NodeSink.Action.values()).map(Enum::name).mapToInt(String::length).max().getAsInt();
+ private static int CLIENT_ACTION_SIZE = Stream.of(ClientAction.values()).map(Enum::name).mapToInt(String::length).max().getAsInt();
+ private static int ALL_ACTION_SIZE = Math.max(ACTION_SIZE, CLIENT_ACTION_SIZE);
+
+ @Override
+ public void onMessage(NodeSink.Action action, Node.Id from, Node.Id to, long id, Message message)
+ {
+ if (txnIdFilter.isEmpty() || containsTxnId(from, to, id, message))
+ logger.debug("Message {}: From {}, To {}, id {}, Message {}", normalize(action), normalize(from), normalize(to), normalizeMessageId(id), message);
+ }
+
+ @Override
+ public void onClientAction(ClientAction action, Node.Id from, TxnId id, Object message)
+ {
+ if (txnIdFilter.isEmpty() || txnIdFilter.contains(id))
+ {
+ String log = message instanceof Throwable ?
+ "Client {}: From {}, To {}, id {}" :
+ "Client {}: From {}, To {}, id {}, Message {}";
+ logger.debug(log, normalize(action), normalize(from), normalize(from), normalize(id), normalizeClientMessage(message));
+ }
+ }
+
+ private static Object normalizeClientMessage(Object o)
+ {
+ if (o instanceof Throwable)
+ trimStackTrace((Throwable) o);
+ return o;
+ }
+
+ private static void trimStackTrace(Throwable input)
+ {
+ for (Throwable current = input; current != null; current = current.getCause())
+ {
+ StackTraceElement[] stack = current.getStackTrace();
+ // remove junit as its super dense and not helpful
+ OptionalInt first = IntStream.range(0, stack.length).filter(i -> stack[i].getClassName().startsWith("org.junit")).findFirst();
+ if (first.isPresent())
+ current.setStackTrace(Arrays.copyOfRange(stack, 0, first.getAsInt()));
+ for (Throwable sup : current.getSuppressed())
+ trimStackTrace(sup);
+ }
+ }
+
+ private static String normalize(NodeSink.Action action)
+ {
+ return Strings.padStart(action.name(), ALL_ACTION_SIZE, ' ');
+ }
+
+ private static String normalize(ClientAction action)
+ {
+ return Strings.padStart(action.name(), ALL_ACTION_SIZE, ' ');
+ }
+
+ private static String normalize(Node.Id id)
+ {
+ return Strings.padStart(id.toString(), 4, ' ');
+ }
+
+ private static String normalizeMessageId(long id)
+ {
+ return Strings.padStart(Long.toString(id), 14, ' ');
+ }
+
+ private static String normalize(Timestamp ts)
+ {
+ return Strings.padStart(ts.toString(), 14, ' ');
+ }
+
+ public static boolean containsTxnId(Node.Id from, Node.Id to, long id, Message message)
+ {
+ if (message instanceof Request)
+ {
+ if (containsAny((Request) message))
+ {
+ txnReplies.add(new TxnReplyId(from, to, id));
+ return true;
+ }
+ return false;
+ }
+ else
+ return txnReplies.contains(new TxnReplyId(to, from, id));
+ }
+
+ private static boolean containsAny(Request message)
+ {
+ if (message instanceof TxnRequest<?>)
+ return txnIdFilter.contains(((TxnRequest<?>) message).txnId);
+ // this includes txn that depend on the txn, should this limit for the first txnId?
+ if (message instanceof PreLoadContext)
+ return ((PreLoadContext) message).txnIds().stream().anyMatch(txnIdFilter::contains);
+ return false;
+ }
+
+ private static class TxnReplyId
+ {
+ final Node.Id from;
+ final Node.Id to;
+ final long id;
+
+ private TxnReplyId(Node.Id from, Node.Id to, long id)
+ {
+ this.from = from;
+ this.to = to;
+ this.id = id;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TxnReplyId that = (TxnReplyId) o;
+ return id == that.id && Objects.equals(from, that.from) && Objects.equals(to, that.to);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(from, to, id);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TxnReplyId{" +
+ "from=" + from +
+ ", to=" + to +
+ ", id=" + id +
+ '}';
+ }
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 5dad1de..853dfe7 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -35,6 +35,7 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import accord.impl.MessageListener;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
import accord.messages.MessageType;
+import accord.messages.Message;
import accord.messages.Reply;
import accord.messages.Request;
import accord.messages.SafeCallback;
@@ -81,18 +83,24 @@
EnumMap<MessageType, Stats> statsMap = new EnumMap<>(MessageType.class);
+ final RandomSource randomSource;
final Function<Id, Node> lookup;
final PendingQueue pending;
+ final Runnable checkFailures;
final List<Runnable> onDone = new ArrayList<>();
final Consumer<Packet> responseSink;
final Map<Id, NodeSink> sinks = new HashMap<>();
+ final MessageListener messageListener;
int clock;
int recurring;
Set<Id> partitionSet;
- public Cluster(Supplier<PendingQueue> queueSupplier, Function<Id, Node> lookup, Consumer<Packet> responseSink)
+ public Cluster(RandomSource randomSource, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id, Node> lookup, Consumer<Packet> responseSink)
{
+ this.randomSource = randomSource;
+ this.messageListener = messageListener;
this.pending = queueSupplier.get();
+ this.checkFailures = checkFailures;
this.lookup = lookup;
this.responseSink = responseSink;
this.partitionSet = new HashSet<>();
@@ -105,26 +113,16 @@
return sink;
}
- private void add(Packet packet)
+ void add(Packet packet, long delay, TimeUnit unit)
{
MessageType type = packet.message.type();
if (type != null)
statsMap.computeIfAbsent(type, ignore -> new Stats()).count++;
- boolean isReply = packet.message instanceof Reply;
if (trace.isTraceEnabled())
- trace.trace("{} {} {}", clock++, isReply ? "RPLY" : "SEND", packet);
+ trace.trace("{} {} {}", clock++, packet.message instanceof Reply ? "RPLY" : "SEND", packet);
if (lookup.apply(packet.dst) == null) responseSink.accept(packet);
- else pending.add(packet);
- }
+ else pending.add(packet, delay, unit);
- void add(Id from, Id to, long messageId, Request send)
- {
- add(new Packet(from, to, messageId, send));
- }
-
- void add(Id from, Id to, long replyId, Reply send)
- {
- add(new Packet(from, to, replyId, send));
}
public void processAll()
@@ -139,6 +137,7 @@
public boolean processPending()
{
+ checkFailures.run();
if (pending.size() == recurring)
return false;
@@ -147,6 +146,7 @@
return false;
processNext(next);
+ checkFailures.run();
return true;
}
@@ -157,18 +157,6 @@
Packet deliver = (Packet) next;
Node on = lookup.apply(deliver.dst);
- // TODO (required, testing): random drop chance independent of partition; also port flaky connections etc. from simulator
- // Drop the message if it goes across the partition
- boolean drop = ((Packet) next).src.id >= 0 &&
- !(partitionSet.contains(deliver.src) && partitionSet.contains(deliver.dst)
- || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dst));
- if (drop)
- {
- if (trace.isTraceEnabled())
- trace.trace("{} DROP[{}] {}", clock++, on.epoch(), deliver);
- return;
- }
-
if (trace.isTraceEnabled())
trace.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver);
@@ -190,6 +178,12 @@
}
}
+ public void notifyDropped(Node.Id from, Node.Id to, long id, Message message)
+ {
+ if (trace.isTraceEnabled())
+ trace.trace("{} DROP[{}] (from:{}, to:{}, {}:{}, body:{})", clock++, lookup.apply(to).epoch(), from, to, message instanceof Reply ? "replyTo" : "id", id, message);
+ }
+
@Override
public Scheduled recurring(Runnable run, long delay, TimeUnit units)
{
@@ -219,13 +213,13 @@
run.run();
}
- public static EnumMap<MessageType, Stats> run(Id[] nodes, Supplier<PendingQueue> queueSupplier, Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
+ public static EnumMap<MessageType, Stats> run(Id[] nodes, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
{
Topology topology = topologyFactory.toTopology(nodes);
Map<Id, Node> lookup = new LinkedHashMap<>();
try
{
- Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink);
+ Cluster sinks = new Cluster(randomSupplier.get(), messageListener, queueSupplier, checkFailures, lookup::get, responseSink);
TopologyUpdates topologyUpdates = new TopologyUpdates(executor);
TopologyRandomizer configRandomizer = new TopologyRandomizer(randomSupplier, topology, topologyUpdates, lookup::get);
List<CoordinateDurabilityScheduling> durabilityScheduling = new ArrayList<>();
@@ -248,7 +242,7 @@
}
// startup
- AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::start).collect(toList()), (a, b) -> null).beginAsResult();
+ AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult();
while (sinks.processPending());
Assertions.assertTrue(startup.isDone());
@@ -270,7 +264,7 @@
Packet next;
while ((next = in.get()) != null)
- sinks.add(next);
+ sinks.add(next, 0, TimeUnit.NANOSECONDS);
while (sinks.processPending());
@@ -289,8 +283,9 @@
while (!sinks.onDone.isEmpty())
{
- sinks.onDone.forEach(Runnable::run);
+ List<Runnable> onDone = new ArrayList<>(sinks.onDone);
sinks.onDone.clear();
+ onDone.forEach(Runnable::run);
while (sinks.processPending());
}
diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index cc026c8..78d76a7 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -49,7 +49,7 @@
public static CommandStores.Factory factory(PendingQueue pending)
{
return (time, agent, store, random, shardDistributor, progressLogFactory) ->
- new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, new SimulatedDelayedExecutorService(pending, agent, random));
+ new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, new SimulatedDelayedExecutorService(pending, agent));
}
public static class DelayedCommandStore extends InMemoryCommandStore
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index e14fb52..a19d696 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -18,13 +18,22 @@
package accord.impl.basic;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
import accord.api.MessageSink;
import accord.local.AgentExecutor;
+import accord.burn.random.FrequentLargeRange;
+import accord.messages.SafeCallback;
+import accord.messages.Message;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.RandomSource;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.Callback;
@@ -32,13 +41,16 @@
import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
import accord.messages.Request;
-import accord.messages.SafeCallback;
-import accord.utils.RandomSource;
import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
+import static java.util.concurrent.TimeUnit.SECONDS;
public class NodeSink implements MessageSink
{
+ public enum Action {DELIVER, DROP, DROP_PARTITIONED, DELIVER_WITH_FAILURE, FAILURE}
+
+ private final Map<Id, Supplier<Action>> nodeActions = new HashMap<>();
+ private final Map<Id, LongSupplier> networkJitter = new HashMap<>();
final Id self;
final Function<Id, Node> lookup;
final Cluster parent;
@@ -56,9 +68,9 @@
}
@Override
- public synchronized void send(Id to, Request send)
+ public void send(Id to, Request send)
{
- parent.add(self, to, SENTINEL_MESSAGE_ID, send);
+ maybeEnqueue(to, SENTINEL_MESSAGE_ID, send, null);
}
@Override
@@ -67,21 +79,116 @@
long messageId = nextMessageId++;
SafeCallback sc = new SafeCallback(executor, callback);
callbacks.put(messageId, sc);
- parent.add(self, to, messageId, send);
- parent.pending.add((PendingRunnable) () -> {
- if (sc == callbacks.get(messageId))
- sc.slowResponse(to);
- }, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
- parent.pending.add((PendingRunnable) () -> {
- if (sc == callbacks.remove(messageId))
- sc.timeout(to);
- }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+ if (maybeEnqueue(to, messageId, send, sc))
+ {
+ parent.pending.add((PendingRunnable) () -> {
+ if (sc == callbacks.get(messageId))
+ sc.slowResponse(to);
+ }, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
+ parent.pending.add((PendingRunnable) () -> {
+ if (sc == callbacks.remove(messageId))
+ sc.timeout(to);
+ }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+ }
}
@Override
public void reply(Id replyToNode, ReplyContext replyContext, Reply reply)
{
- parent.add(self, replyToNode, Packet.getMessageId(replyContext), reply);
+ maybeEnqueue(replyToNode, Packet.getMessageId(replyContext), reply, null);
+ }
+
+ private boolean maybeEnqueue(Node.Id to, long id, Message message, SafeCallback callback)
+ {
+ Runnable task = () -> {
+ Packet packet;
+ if (message instanceof Reply) packet = new Packet(self, to, id, (Reply) message);
+ else packet = new Packet(self, to, id, (Request) message);
+ parent.add(packet, networkJitterNanos(to), TimeUnit.NANOSECONDS);
+ };
+ if (to.equals(self) || lookup.apply(to) == null /* client */)
+ {
+ parent.messageListener.onMessage(Action.DELIVER, self, to, id, message);
+ task.run();
+ return true;
+ }
+
+ Action action = partitioned(to) ? Action.DROP_PARTITIONED
+ // call actions() per node so each one has different "runs" state
+ : nodeActions.computeIfAbsent(to, ignore -> actions()).get();
+ parent.messageListener.onMessage(action, self, to, id, message);
+ switch (action)
+ {
+ case DELIVER:
+ task.run();
+ return true;
+ case DELIVER_WITH_FAILURE:
+ task.run();
+ case FAILURE:
+
+ if (action == Action.FAILURE)
+ parent.notifyDropped(self, to, id, message);
+ if (callback != null)
+ {
+ parent.pending.add((PendingRunnable) () -> {
+ if (callback == callbacks.remove(id))
+ {
+ try
+ {
+ callback.failure(to, new SimulatedFault("Simulation Failure; src=" + self + ", to=" + to + ", id=" + id + ", message=" + message));
+ }
+ catch (Throwable t)
+ {
+ callback.onCallbackFailure(to, t);
+ lookup.apply(self).agent().onUncaughtException(t);
+ }
+ }
+ }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+ }
+ return false;
+ case DROP_PARTITIONED:
+ case DROP:
+ // TODO (consistency): parent.notifyDropped is a trace logger that is very similar in spirit to MessageListener; can we unify?
+ parent.notifyDropped(self, to, id, message);
+ return true;
+ default:
+ throw new AssertionError("Unexpected action: " + action);
+ }
+ }
+
+ private long networkJitterNanos(Node.Id dst)
+ {
+ return networkJitter.computeIfAbsent(dst, ignore -> defaultJitter())
+ .getAsLong();
+ }
+
+ private LongSupplier defaultJitter()
+ {
+ return FrequentLargeRange.builder(random)
+ .ratio(1, 5)
+ .small(500, TimeUnit.MICROSECONDS, 5, TimeUnit.MILLISECONDS)
+ .large(50, TimeUnit.MILLISECONDS, 5, SECONDS)
+ .build()
+ .asLongSupplier(random);
+ }
+
+ private boolean partitioned(Id to)
+ {
+ return parent.partitionSet.contains(self) != parent.partitionSet.contains(to);
+ }
+
+ private Supplier<Action> actions()
+ {
+ Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01);
+ Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01);
+ Gen<Action> actionGen = rs -> {
+ if (drops.next(rs))
+ return Action.DROP;
+ return failures.next(rs) ?
+ rs.nextBoolean() ? Action.FAILURE : Action.DELIVER_WITH_FAILURE
+ : Action.DELIVER;
+ };
+ return actionGen.asSupplier(random);
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/basic/PendingQueue.java b/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
index e070dd8..fd4932d 100644
--- a/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
@@ -23,6 +23,7 @@
public interface PendingQueue
{
void add(Pending item);
+ void addNoDelay(Pending item);
void add(Pending item, long delay, TimeUnit units);
boolean remove(Pending item);
Pending poll();
diff --git a/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java b/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
index a45b53e..bd25ac8 100644
--- a/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
@@ -39,6 +39,12 @@
}
@Override
+ public void addNoDelay(Pending item)
+ {
+ wrapped.addNoDelay(item);
+ }
+
+ @Override
public void add(Pending item, long delay, TimeUnit units)
{
wrapped.add(item, delay, units);
@@ -53,6 +59,12 @@
@Override
public Pending poll()
{
+ checkFailures();
+ return wrapped.poll();
+ }
+
+ public void checkFailures()
+ {
if (!failures.isEmpty())
{
AssertionError assertion = null;
@@ -74,8 +86,6 @@
}
throw assertion;
}
-
- return wrapped.poll();
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
index b0cec4d..85eb236 100644
--- a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
@@ -18,12 +18,14 @@
package accord.impl.basic;
+import accord.burn.random.FrequentLargeRange;
+import accord.utils.RandomSource;
+
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
-import accord.utils.RandomSource;
-
public class RandomDelayQueue implements PendingQueue
{
public static class Factory implements Supplier<RandomDelayQueue>
@@ -87,25 +89,38 @@
}
final PriorityQueue<Item> queue = new PriorityQueue<>();
- final RandomSource random;
+ private final LongSupplier jitterMillis;
long now;
int seq;
RandomDelayQueue(RandomSource random)
{
- this.random = random;
+ this.jitterMillis = FrequentLargeRange.builder(random)
+ .small(0, 50, TimeUnit.MICROSECONDS)
+ .large(50, TimeUnit.MICROSECONDS, 5, TimeUnit.MILLISECONDS)
+ .build()
+ .mapAsLong(TimeUnit.NANOSECONDS::toMillis)
+ .asLongSupplier(random);
}
@Override
public void add(Pending item)
{
- add(item, random.nextInt(500), TimeUnit.MILLISECONDS);
+ add(item, 0, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public void addNoDelay(Pending item)
+ {
+ queue.add(new Item(now, seq++, item));
}
@Override
public void add(Pending item, long delay, TimeUnit units)
{
- queue.add(new Item(now + units.toMillis(delay), seq++, item));
+ if (delay < 0)
+ throw new IllegalArgumentException("Delay must be positive or 0, but given " + delay);
+ queue.add(new Item(now + units.toMillis(delay) + jitterMillis.getAsLong(), seq++, item));
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
index 2f3bd1b..d1971b5 100644
--- a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -26,10 +26,6 @@
import java.util.concurrent.TimeUnit;
import accord.api.Agent;
-import accord.burn.random.FrequentLargeRange;
-import accord.burn.random.RandomLong;
-import accord.burn.random.RandomWalkRange;
-import accord.utils.RandomSource;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -134,42 +130,23 @@
private final PendingQueue pending;
private final Agent agent;
- private final RandomSource random;
- private final RandomLong jitterInNano;
private long sequenceNumber;
- public SimulatedDelayedExecutorService(PendingQueue pending, Agent agent, RandomSource random)
+ public SimulatedDelayedExecutorService(PendingQueue pending, Agent agent)
{
this.pending = pending;
this.agent = agent;
- this.random = random;
- // this is different from Apache Cassandra Simulator as this is computed differently for each executor
- // rather than being a global config
- double ratio = random.nextInt(1, 11) / 100.0D;
- this.jitterInNano = new FrequentLargeRange(new RandomWalkRange(random, microToNanos(0), microToNanos(50)),
- new RandomWalkRange(random, microToNanos(50), msToNanos(5)),
- ratio);
- }
-
- private static int msToNanos(int value)
- {
- return Math.toIntExact(TimeUnit.MILLISECONDS.toNanos(value));
- }
-
- private static int microToNanos(int value)
- {
- return Math.toIntExact(TimeUnit.MICROSECONDS.toNanos(value));
}
@Override
public void execute(Task<?> task)
{
- pending.add(task, jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
+ pending.add(task);
}
private void schedule(Task<?> task, long delay, TimeUnit unit)
{
- pending.add(task, unit.toNanos(delay) + jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
+ pending.add(task, delay, unit);
}
@Override
diff --git a/accord-core/src/test/java/accord/burn/random/RandomLong.java b/accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
similarity index 82%
rename from accord-core/src/test/java/accord/burn/random/RandomLong.java
rename to accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
index 1bc8c48..3af4ead 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomLong.java
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
@@ -16,11 +16,12 @@
* limitations under the License.
*/
-package accord.burn.random;
+package accord.impl.basic;
-import accord.utils.RandomSource;
-
-public interface RandomLong
+public class SimulatedFault extends AssertionError
{
- long getLong(RandomSource randomSource);
+ public SimulatedFault(Object detailMessage)
+ {
+ super(detailMessage);
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 5fb3924..1a9611c 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -68,7 +68,7 @@
@Override
public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next)
{
- throw new AssertionError("Inconsistent execution timestamp detected for txnId " + command.txnId() + ": " + prev + " != " + next);
+ throw new AssertionError("Inconsistent execution timestamp detected for command " + command + ": " + prev + " != " + next);
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index c6127a3..2f2084a 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -18,18 +18,20 @@
package accord.impl.list;
-import java.util.function.BiConsumer;
-
import accord.api.Result;
import accord.api.RoutingKey;
import accord.coordinate.CheckShards;
import accord.coordinate.CoordinationFailed;
import accord.coordinate.Invalidated;
import accord.coordinate.Truncated;
+import accord.coordinate.Timeout;
+import accord.impl.MessageListener;
import accord.impl.basic.Cluster;
import accord.impl.basic.Packet;
+import accord.impl.basic.SimulatedFault;
import accord.local.Node;
import accord.local.Node.Id;
+import accord.local.SaveStatus;
import accord.local.Status;
import accord.messages.CheckStatus.CheckStatusOk;
import accord.messages.CheckStatus.IncludeInfo;
@@ -40,7 +42,11 @@
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import javax.annotation.Nullable;
+
import static accord.local.Status.Phase.Cleanup;
+import java.util.function.BiConsumer;
+
import static accord.local.Status.PreApplied;
import static accord.local.Status.PreCommitted;
@@ -68,6 +74,10 @@
protected Action checkSufficient(Id from, CheckStatusOk ok)
{
++count;
+ // this method is called for each reply, so if we see a reply where the status is not known, it may be known on others;
+ // once all status are merged, then onDone will apply aditional logic to make sure things are safe.
+ if (ok.saveStatus == SaveStatus.Uninitialised)
+ return Action.ApproveIfQuorum;
return ok.saveStatus.hasBeen(PreApplied) ? Action.Approve : Action.Reject;
}
@@ -94,74 +104,125 @@
final Node node;
final Id client;
final ReplyContext replyContext;
+ final MessageListener listener;
+ final TxnId id;
final Txn txn;
- ResultCallback(Node node, Id client, ReplyContext replyContext, Txn txn)
+ ResultCallback(Node node, Id client, ReplyContext replyContext, MessageListener listener, TxnId id, Txn txn)
{
this.node = node;
this.client = client;
this.replyContext = replyContext;
+ this.listener = listener;
+ this.id = id;
this.txn = txn;
}
@Override
public void accept(Result success, Throwable fail)
{
- if (fail instanceof CoordinationFailed)
+ if (fail != null)
{
- RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
- TxnId txnId = ((CoordinationFailed) fail).txnId();
- if (fail instanceof Invalidated)
+ listener.onClientAction(MessageListener.ClientAction.FAILURE, node.id(), id, fail);
+ if (fail instanceof CoordinationFailed)
{
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null);
- return;
- }
+ RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
+ TxnId txnId = ((CoordinationFailed) fail).txnId();
+ if (fail instanceof Invalidated)
+ {
+ node.reply(client, replyContext, ListResult.invalidated(client, ((Packet)replyContext).requestId, txnId), null);
+ return;
+ }
- node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null), null);
- ((Cluster)node.scheduler()).onDone(() -> {
- node.commandStores()
- .select(homeKey)
- .execute(() -> CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f) -> {
- if (f != null)
- {
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f instanceof Truncated ? new int[2][] : new int[3][], null), null);
- return;
- }
- switch (s)
- {
- case Truncated:
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[2][], null), null);
- break;
- case Invalidated:
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null);
- break;
- case Lost:
- node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[1][], null), null);
- break;
- case Other:
- // currently caught elsewhere in response tracking, but might help to throw an exception here
- }
- }));
- });
+ node.reply(client, replyContext, ListResult.heartBeat(client, ((Packet)replyContext).requestId, txnId), null);
+ ((Cluster) node.scheduler()).onDone(() -> checkOnResult(homeKey, txnId, 0, null));
+ }
+ else if (fail instanceof SimulatedFault)
+ {
+ node.reply(client, replyContext, ListResult.heartBeat(client, ((Packet)replyContext).requestId, id), null);
+ ((Cluster) node.scheduler()).onDone(() -> checkOnResult(null, id, 0, null));
+ }
+ else
+ {
+ node.agent().onUncaughtException(fail);
+ }
+ }
+ else if (success != null)
+ {
+ listener.onClientAction(MessageListener.ClientAction.SUCCESS, node.id(), id, success);
+ node.reply(client, replyContext, (ListResult) success, null);
}
else
{
- node.reply(client, replyContext, (ListResult) success, fail);
+ listener.onClientAction(MessageListener.ClientAction.UNKNOWN, node.id(), id, null);
+ node.agent().onUncaughtException(new NullPointerException("Success and Failure were both null"));
}
}
+
+ private void checkOnResult(@Nullable RoutingKey homeKey, TxnId txnId, int attempt, Throwable t) {
+ if (attempt == 3)
+ {
+ node.agent().onUncaughtException(t);
+ return;
+ }
+ if (homeKey == null)
+ homeKey = node.selectRandomHomeKey(txnId);
+ RoutingKey finalHomeKey = homeKey;
+ node.commandStores().select(homeKey).execute(() -> CheckOnResult.checkOnResult(node, txnId, finalHomeKey, (s, f) -> {
+ if (f != null)
+ {
+ if (f instanceof Truncated)
+ {
+ node.reply(client, replyContext, ListResult.truncated(client, ((Packet)replyContext).requestId, txnId), null);
+ return;
+ }
+ if (f instanceof Timeout || f instanceof SimulatedFault) checkOnResult(finalHomeKey, txnId, attempt + 1, f);
+ else
+ {
+ node.reply(client, replyContext, ListResult.failure(client, ((Packet)replyContext).requestId, txnId), null);
+ node.agent().onUncaughtException(f);
+ }
+ return;
+ }
+ switch (s)
+ {
+ case Truncated:
+ node.reply(client, replyContext, ListResult.truncated(client, ((Packet)replyContext).requestId, txnId), null);
+ break;
+ case Invalidated:
+ node.reply(client, replyContext, ListResult.invalidated(client, ((Packet)replyContext).requestId, txnId), null);
+ break;
+ case Lost:
+ node.reply(client, replyContext, ListResult.lost(client, ((Packet)replyContext).requestId, txnId), null);
+ break;
+ case Other:
+ node.reply(client, replyContext, ListResult.other(client, ((Packet)replyContext).requestId, txnId), null);
+ break;
+ default:
+ node.agent().onUncaughtException(new AssertionError("Unknown outcome: " + s));
+ }
+ }));
+ }
}
public final Txn txn;
+ private final MessageListener listener;
+ private TxnId id;
- public ListRequest(Txn txn)
+ public ListRequest(Txn txn, MessageListener listener)
{
this.txn = txn;
+ this.listener = listener;
}
@Override
public void process(Node node, Id client, ReplyContext replyContext)
{
- node.coordinate(txn).addCallback(new ResultCallback(node, client, replyContext, txn));
+ if (id != null)
+ throw new IllegalStateException("Called process multiple times");
+ id = node.nextTxnId(txn.kind(), txn.keys().domain());
+ listener.onClientAction(MessageListener.ClientAction.SUBMIT, node.id(), id, txn);
+ node.coordinate(id, txn).addCallback(new ResultCallback(node, client, replyContext, listener, id, txn));
}
@Override
@@ -173,7 +234,7 @@
@Override
public String toString()
{
- return txn.toString();
+ return id == null ? txn.toString() : id + " -> " + txn;
}
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java b/accord-core/src/test/java/accord/impl/list/ListResult.java
index dc3311b..88ffeb2 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -32,6 +32,7 @@
public class ListResult implements Result, Reply
{
+ public enum Fault { HeartBeat, Invalidated, Lost, Other, Truncated, Failure }
public final Id client;
public final long requestId;
public final TxnId txnId;
@@ -39,6 +40,7 @@
public final Keys responseKeys;
public final int[][] read; // equal in size to keys.size()
public final ListUpdate update;
+ private final Fault fault;
public ListResult(Id client, long requestId, TxnId txnId, Seekables<?, ?> readKeys, Keys responseKeys, int[][] read, ListUpdate update)
{
@@ -49,6 +51,49 @@
this.responseKeys = responseKeys;
this.read = read;
this.update = update;
+ this.fault = null;
+ }
+
+ private ListResult(Id client, long requestId, TxnId txnId, Fault fault)
+ {
+ this.client = client;
+ this.requestId = requestId;
+ this.txnId = txnId;
+ this.readKeys = null;
+ this.responseKeys = null;
+ this.read = null;
+ this.update = null;
+ this.fault = fault;
+ }
+
+ public static ListResult heartBeat(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.HeartBeat);
+ }
+
+ public static ListResult invalidated(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Invalidated);
+ }
+
+ public static ListResult lost(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Lost);
+ }
+
+ public static ListResult other(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Other);
+ }
+
+ public static ListResult truncated(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Truncated);
+ }
+
+ public static ListResult failure(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Failure);
}
@Override
@@ -57,6 +102,18 @@
return null;
}
+ public boolean isSuccess()
+ {
+ return fault == null;
+ }
+
+ public Fault fault()
+ {
+ if (isSuccess())
+ throw new IllegalStateException("Unable to find fault with successful results");
+ return fault;
+ }
+
@Override
public String toString()
{
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 8c1996a..4d04d51 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -131,7 +131,7 @@
SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new);
- awaitUninterruptibly(node.start());
+ awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(topology, true);
return node;
}
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 879dec7..365ceec 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -113,7 +113,7 @@
clock, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
() -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
- awaitUninterruptibly(node.start());
+ awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(storeSupport.local.get(), true);
return node;
}
diff --git a/accord-core/src/test/java/accord/utils/Gen.java b/accord-core/src/test/java/accord/utils/Gen.java
index 62da8c5..af86340 100644
--- a/accord-core/src/test/java/accord/utils/Gen.java
+++ b/accord-core/src/test/java/accord/utils/Gen.java
@@ -21,8 +21,18 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntPredicate;
+import java.util.function.IntSupplier;
+import java.util.function.IntUnaryOperator;
import java.util.function.LongPredicate;
+import java.util.function.LongSupplier;
+import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
public interface Gen<A> {
/**
@@ -46,6 +56,16 @@
return r -> fn.apply(r, this.next(r));
}
+ default IntGen mapToInt(ToIntFunction<A> fn)
+ {
+ return r -> fn.applyAsInt(next(r));
+ }
+
+ default LongGen mapToLong(ToLongFunction<A> fn)
+ {
+ return r -> fn.applyAsLong(next(r));
+ }
+
default <B> Gen<B> flatMap(Function<? super A, Gen<? extends B>> mapper)
{
return rs -> mapper.apply(this.next(rs)).next(rs);
@@ -69,6 +89,16 @@
};
}
+ default Supplier<A> asSupplier(RandomSource rs)
+ {
+ return () -> next(rs);
+ }
+
+ default Stream<A> asStream(RandomSource rs)
+ {
+ return Stream.generate(() -> next(rs));
+ }
+
interface IntGen extends Gen<Integer>
{
int nextInt(RandomSource random);
@@ -79,7 +109,12 @@
return nextInt(random);
}
- default Gen.IntGen filterInt(IntPredicate fn)
+ default IntGen mapAsInt(IntUnaryOperator fn)
+ {
+ return r -> fn.applyAsInt(nextInt(r));
+ }
+
+ default Gen.IntGen filterAsInt(IntPredicate fn)
{
return rs -> {
int value;
@@ -95,7 +130,17 @@
@Override
default Gen.IntGen filter(Predicate<Integer> fn)
{
- return filterInt(i -> fn.test(i));
+ return filterAsInt(i -> fn.test(i));
+ }
+
+ default IntSupplier asIntSupplier(RandomSource rs)
+ {
+ return () -> nextInt(rs);
+ }
+
+ default IntStream asIntStream(RandomSource rs)
+ {
+ return IntStream.generate(() -> nextInt(rs));
}
}
@@ -109,7 +154,12 @@
return nextLong(random);
}
- default Gen.LongGen filterLong(LongPredicate fn)
+ default LongGen mapAsLong(LongUnaryOperator fn)
+ {
+ return r -> fn.applyAsLong(nextLong(r));
+ }
+
+ default Gen.LongGen filterAsLong(LongPredicate fn)
{
return rs -> {
long value;
@@ -125,7 +175,17 @@
@Override
default Gen.LongGen filter(Predicate<Long> fn)
{
- return filterLong(i -> fn.test(i));
+ return filterAsLong(i -> fn.test(i));
+ }
+
+ default LongSupplier asLongSupplier(RandomSource rs)
+ {
+ return () -> nextLong(rs);
+ }
+
+ default LongStream asLongStream(RandomSource rs)
+ {
+ return LongStream.generate(() -> nextLong(rs));
}
}
}
diff --git a/accord-core/src/test/java/accord/utils/GenTest.java b/accord-core/src/test/java/accord/utils/GenTest.java
index c722335..7a04669 100644
--- a/accord-core/src/test/java/accord/utils/GenTest.java
+++ b/accord-core/src/test/java/accord/utils/GenTest.java
@@ -18,19 +18,25 @@
package accord.utils;
+import org.agrona.collections.IntArrayList;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.List;
+import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import static accord.utils.Property.qt;
+import static org.assertj.core.api.Assertions.assertThat;
public class GenTest {
@Test
@@ -97,4 +103,74 @@
});
});
}
+
+ enum PickWeight {A, B, C}
+
+ @Test
+ public void pickWeight()
+ {
+ int samples = 1000;
+ Gen<PickWeight> enums = Gens.enums().allWithWeights(PickWeight.class, 81, 10, 1);
+ Gen<Map<PickWeight, Integer>> gen = rs -> {
+ Map<PickWeight, Integer> counts = new EnumMap<>(PickWeight.class);
+ for (int i = 0; i < samples; i++)
+ counts.compute(enums.next(rs), (ignore, accum) -> accum == null ? 1 : accum + 1);
+ return counts;
+ };
+ qt().forAll(gen).check(counts -> {
+ // expected 810
+ assertThat(counts.get(PickWeight.A)).isGreaterThan(counts.get(PickWeight.B));
+ // expected 100
+ assertThat(counts.get(PickWeight.B))
+ .isBetween(50, 200);
+
+ if (counts.containsKey(PickWeight.C))
+ {
+ assertThat(counts.get(PickWeight.B))
+ .isGreaterThan(counts.get(PickWeight.C));
+
+ // expected 10
+ assertThat(counts.get(PickWeight.C))
+ .isBetween(1, 60);
+ }
+ });
+ }
+
+ @Test
+ public void runs()
+ {
+ double ratio = 0.0625;
+ int samples = 1000;
+ Gen<Runs> gen = Gens.lists(Gens.bools().biasedRepeatingRuns(ratio)).ofSize(samples).map(Runs::new);
+ qt().forAll(gen).check(runs -> {
+ assertThat(IntStream.of(runs.runs).filter(i -> i > 5).toArray()).isNotEmpty();
+ assertThat(runs.counts.get(true) / 1000.0).isBetween(ratio * .5, 0.1);
+ });
+ }
+
+ private static class Runs
+ {
+ private final Map<Boolean, Long> counts;
+ private final int[] runs;
+
+ Runs(List<Boolean> samples)
+ {
+ this.counts = samples.stream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
+ IntArrayList runs = new IntArrayList();
+ int run = -1;
+ for (boolean b : samples)
+ {
+ if (b)
+ {
+ run = run == -1 ? 1 : run + 1;
+ }
+ else if (run != -1)
+ {
+ runs.add(run);
+ run = -1;
+ }
+ }
+ this.runs = runs.toIntArray();
+ }
+ }
}
diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java
index 4ccda46..244cd64 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -170,7 +170,7 @@
return RandomSource::nextBoolean;
}
- public Gen<Boolean> runs(double ratio)
+ public Gen<Boolean> biasedRepeatingRuns(double ratio)
{
Invariants.checkArgument(ratio > 0 && ratio <= 1, "Expected %d to be larger than 0 and <= 1", ratio);
int steps = (int) (1 / ratio);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index cd7741b..f988703 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -321,7 +321,7 @@
SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
}
- AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::start).collect(toList()), (a, b) -> null).beginAsResult();
+ AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult();
while (sinks.processPending());
if (!startup.isDone()) throw new AssertionError();
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 1600e03..a191904 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -177,7 +177,7 @@
MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
- awaitUninterruptibly(on.start());
+ awaitUninterruptibly(on.unsafeStart());
err.println("Initialized node " + init.self);
err.flush();
sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index f77a0c0..109dd72 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -49,6 +49,10 @@
jvmArgs += ['-XX:+HeapDumpOnOutOfMemoryError', "-XX:HeapDumpPath=${buildDir}"]
}
+tasks.withType(Test) {
+ jvmArgs '-XX:+HeapDumpOnOutOfMemoryError'
+}
+
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.7.0'