blob: d8bb0ae0dcc8fa6a16ef02d84334d10f6fbbdc7c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.burn;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import accord.burn.random.FrequentLargeRange;
import accord.impl.MessageListener;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Key;
import accord.impl.IntHashKey;
import accord.impl.basic.Cluster;
import accord.impl.basic.PendingRunnable;
import accord.impl.basic.PropagatingPendingQueue;
import accord.impl.basic.RandomDelayQueue;
import accord.impl.basic.RandomDelayQueue.Factory;
import accord.impl.TopologyFactory;
import accord.impl.basic.Packet;
import accord.impl.basic.SimulatedDelayedExecutorService;
import accord.impl.list.ListAgent;
import accord.impl.list.ListQuery;
import accord.impl.list.ListRead;
import accord.impl.list.ListRequest;
import accord.impl.list.ListResult;
import accord.impl.list.ListUpdate;
import accord.local.CommandStore;
import accord.local.Node.Id;
import accord.messages.MessageType;
import accord.primitives.Keys;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Txn;
import accord.utils.DefaultRandom;
import accord.utils.RandomSource;
import accord.utils.async.AsyncExecutor;
import accord.verify.StrictSerializabilityVerifier;
import static accord.impl.IntHashKey.forHash;
import static accord.utils.Utils.toArray;
public class BurnTest
private static final Logger logger = LoggerFactory.getLogger(BurnTest.class);
static List<Packet> generate(RandomSource random, MessageListener listener, Function<? super CommandStore, AsyncExecutor> executor, List<Id> clients, List<Id> nodes, int keyCount, int operations)
List<Key> keys = new ArrayList<>();
for (int i = 0 ; i < keyCount ; ++i)
List<Packet> packets = new ArrayList<>();
int[] next = new int[keyCount];
double readInCommandStore = random.nextDouble();
for (int count = 0 ; count < operations ; ++count)
Id client = clients.get(random.nextInt(clients.size()));
Id node = nodes.get(random.nextInt(nodes.size()));
boolean isRangeQuery = random.nextBoolean();
if (isRangeQuery)
int rangeCount = 1 + random.nextInt(2);
List<Range> requestRanges = new ArrayList<>();
while (--rangeCount >= 0)
int j = 1 + random.nextInt(0xffff), i = Math.max(0, j - (1 + random.nextInt(0x1ffe)));
requestRanges.add(IntHashKey.range(forHash(i), forHash(j)));
Ranges ranges = Ranges.of(requestRanges.toArray(new Range[0]));
ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, ranges, ranges);
ListQuery query = new ListQuery(client, count);
ListRequest request = new ListRequest(new Txn.InMemory(ranges, read, query, null), listener);
packets.add(new Packet(client, node, count, request));
boolean isWrite = random.nextBoolean();
int readCount = 1 + random.nextInt(2);
int writeCount = isWrite ? random.nextInt(3) : 0;
TreeSet<Key> requestKeys = new TreeSet<>();
while (readCount-- > 0)
requestKeys.add(randomKey(random, keys, requestKeys));
ListUpdate update = isWrite ? new ListUpdate(executor) : null;
while (writeCount-- > 0)
int i = randomKeyIndex(random, keys, update.keySet());
update.put(keys.get(i), ++next[i]);
Keys readKeys = new Keys(requestKeys);
if (isWrite)
ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, readKeys, new Keys(requestKeys));
ListQuery query = new ListQuery(client, count);
ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update), listener);
packets.add(new Packet(client, node, count, request));
return packets;
private static Key randomKey(RandomSource random, List<Key> keys, Set<Key> notIn)
return keys.get(randomKeyIndex(random, keys, notIn));
private static int randomKeyIndex(RandomSource random, List<Key> keys, Set<Key> notIn)
return randomKeyIndex(random, keys, notIn::contains);
private static int randomKeyIndex(RandomSource random, List<Key> keys, Predicate<Key> notIn)
int i;
//noinspection StatementWithEmptyBody
while (notIn.test(keys.get(i = random.nextInt(keys.size()))));
return i;
static void reconcile(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws ExecutionException, InterruptedException
ReconcilingLogger logReconciler = new ReconcilingLogger(logger);
RandomSource random1 = new DefaultRandom(), random2 = new DefaultRandom();
ExecutorService exec = Executors.newFixedThreadPool(2);
Future<?> f1;
try (@SuppressWarnings("unused") ReconcilingLogger.Session session = logReconciler.nextSession())
f1 = exec.submit(() -> burn(random1, topologyFactory, clients, nodes, keyCount, operations, concurrency));
Future<?> f2;
try (@SuppressWarnings("unused") ReconcilingLogger.Session session = logReconciler.nextSession())
f2 = exec.submit(() -> burn(random2, topologyFactory, clients, nodes, keyCount, operations, concurrency));
assert logReconciler.reconcile();
static void burn(RandomSource random, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency)
List<Throwable> failures = Collections.synchronizedList(new ArrayList<>());
RandomDelayQueue delayQueue = new Factory(random).get();
PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, delayQueue);
RandomSource retryRandom = random.fork();
ListAgent agent = new ListAgent(1000L, failures::add, retry -> {
long delay = retryRandom.nextInt(1, 15);
queue.add((PendingRunnable)retry::run, delay, TimeUnit.SECONDS);
Supplier<LongSupplier> nowSupplier = () -> {
RandomSource forked = random.fork();
return FrequentLargeRange.builder(forked)
.ratio(1, 5)
.small(50, 5000, TimeUnit.MICROSECONDS)
.large(1, 10, TimeUnit.MILLISECONDS)
.mapAsLong(j -> Math.max(0, queue.nowInMillis() + j))
StrictSerializabilityVerifier strictSerializable = new StrictSerializabilityVerifier(keyCount);
SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, agent);
Function<CommandStore, AsyncExecutor> executor = ignore -> globalExecutor;
MessageListener listener = MessageListener.get();
Packet[] requests = toArray(generate(random, listener, executor, clients, nodes, keyCount, operations), Packet[]::new);
int[] starts = new int[requests.length];
Packet[] replies = new Packet[requests.length];
AtomicInteger acks = new AtomicInteger();
AtomicInteger nacks = new AtomicInteger();
AtomicInteger lost = new AtomicInteger();
AtomicInteger truncated = new AtomicInteger();
AtomicInteger failedToCheck = new AtomicInteger();
AtomicInteger clock = new AtomicInteger();
AtomicInteger requestIndex = new AtomicInteger();
Queue<Packet> initialRequests = new ArrayDeque<>();
for (int max = Math.min(concurrency, requests.length) ; requestIndex.get() < max ; )
int i = requestIndex.getAndIncrement();
starts[i] = clock.incrementAndGet();
// not used for atomicity, just for encapsulation
AtomicReference<Runnable> onSubmitted = new AtomicReference<>();
Consumer<Packet> responseSink = packet -> {
ListResult reply = (ListResult) packet.message;
if (replies[(int)packet.replyId] != null)
if (requestIndex.get() < requests.length)
int i = requestIndex.getAndIncrement();
starts[i] = clock.incrementAndGet();
if (i == requests.length - 1)
if (!reply.isSuccess() && reply.fault() == ListResult.Fault.HeartBeat)
return; // interrupted; will fetch our actual reply once rest of simulation is finished (but wanted to send another request to keep correct number in flight)
int start = starts[(int)packet.replyId];
int end = clock.incrementAndGet();
logger.debug("{} at [{}, {}]", reply, start, end);
replies[(int)packet.replyId] = packet;
if (!reply.isSuccess())
switch (reply.fault())
case Lost: lost.incrementAndGet(); break;
case Invalidated: nacks.incrementAndGet(); break;
case Failure: failedToCheck.incrementAndGet(); break;
case Truncated: truncated.incrementAndGet(); break;
// txn was applied?, but client saw a timeout, so response isn't known
case Other: break;
default: throw new AssertionError("Unexpected fault: " + reply.fault());
for (int i = 0 ; i < ; ++i)
Key key = reply.responseKeys.get(i);
int k = key(key);
int[] read =[i];
int write = reply.update == null ? -1 : reply.update.getOrDefault(key, -1);
if (read != null)
strictSerializable.witnessRead(k, read);
if (write >= 0)
strictSerializable.witnessWrite(k, write);
strictSerializable.apply(start, end);
catch (Throwable t)
EnumMap<MessageType, Cluster.Stats> messageStatsMap;
messageStatsMap =, Id[]::new), listener, () -> queue, queue::checkFailures,
responseSink, globalExecutor,
random::fork, nowSupplier,
topologyFactory, initialRequests::poll,
catch (Throwable t)
for (int i = 0 ; i < requests.length ; ++i)
{"{}", requests[i]);"\t\t" + replies[i]);
throw t;
}"Received {} acks, {} nacks, {} lost, {} truncated ({} total) to {} operations", acks.get(), nacks.get(), lost.get(), truncated.get(), acks.get() + nacks.get() + lost.get() + truncated.get(), operations);"Message counts: {}", messageStatsMap.entrySet());
if (clock.get() != operations * 2)
StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < requests.length ; ++i)
// since this only happens when operations are lost, only log the ones without a reply to lower the amount of noise
if (replies[i] == null)
throw new AssertionError("Incomplete set of responses; clock=" + clock.get() + ", expected operations=" + (operations * 2));
public static void main(String[] args)
int count = 1;
int operations = 1000;
Long overrideSeed = null;
LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong;
for (int i = 0 ; i < args.length ; i += 2)
switch (args[i])
default: throw new IllegalArgumentException("Invalid option: " + args[i]);
case "-c":
count = Integer.parseInt(args[i + 1]);
overrideSeed = null;
case "-s":
overrideSeed = Long.parseLong(args[i + 1]);
count = 1;
case "-o":
operations = Integer.parseInt(args[i + 1]);
case "--loop-seed":
seedGenerator = new DefaultRandom(Long.parseLong(args[i + 1]))::nextLong;
while (count-- > 0)
run(overrideSeed != null ? overrideSeed : seedGenerator.getAsLong(), operations);
@Timeout(value = 3, unit = TimeUnit.MINUTES)
public void testOne()
run(ThreadLocalRandom.current().nextLong(), 1000);
private static void run(long seed, int operations)
{"Seed: {}", seed);
Cluster.trace.trace("Seed: {}", seed);
RandomSource random = new DefaultRandom(seed);
List<Id> clients = generateIds(true, 1 + random.nextInt(4));
int rf;
float chance = random.nextFloat();
if (chance < 0.2f) { rf = random.nextInt(2, 9); }
else if (chance < 0.4f) { rf = 3; }
else if (chance < 0.7f) { rf = 5; }
else if (chance < 0.8f) { rf = 7; }
else { rf = 9; }
List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
burn(random, new TopologyFactory(rf, IntHashKey.ranges(random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 3))),
5 + random.nextInt(15),
10 + random.nextInt(30));
catch (Throwable t)
logger.error("Exception running burn test for seed {}:", seed, t);
throw SimulationException.wrap(seed, t);
private static List<Id> generateIds(boolean clients, int count)
List<Id> ids = new ArrayList<>();
for (int i = 1; i <= count ; ++i)
ids.add(new Id(clients ? -i : i));
return ids;
private static int key(Key key)
return ((IntHashKey) key).key;