blob: caf642e21813c354bd2a13f020eb4c1f4254a65a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.file.FileSystem;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInstanceInitializer;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableBiConsumer;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableConsumer;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
import org.apache.cassandra.distributed.impl.DirectStreamingConnectionFactory;
import org.apache.cassandra.distributed.impl.IsolatedExecutor;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.service.paxos.BallotGenerator;
import org.apache.cassandra.service.paxos.PaxosPrepare;
import org.apache.cassandra.simulator.RandomSource.Choices;
import org.apache.cassandra.simulator.asm.InterceptAsClassTransformer;
import org.apache.cassandra.simulator.asm.NemesisFieldSelectors;
import org.apache.cassandra.simulator.cluster.ClusterActions;
import org.apache.cassandra.simulator.cluster.ClusterActions.TopologyChange;
import org.apache.cassandra.simulator.systems.Failures;
import org.apache.cassandra.simulator.systems.InterceptedWait.CaptureSites.Capture;
import org.apache.cassandra.simulator.systems.InterceptibleThread;
import org.apache.cassandra.simulator.systems.InterceptingGlobalMethods;
import org.apache.cassandra.simulator.systems.InterceptingGlobalMethods.ThreadLocalRandomCheck;
import org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods;
import org.apache.cassandra.simulator.systems.InterceptingExecutorFactory;
import org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods.IfInterceptibleThread;
import org.apache.cassandra.simulator.systems.NetworkConfig;
import org.apache.cassandra.simulator.systems.NetworkConfig.PhaseConfig;
import org.apache.cassandra.simulator.systems.SchedulerConfig;
import org.apache.cassandra.simulator.systems.SimulatedFutureActionScheduler;
import org.apache.cassandra.simulator.systems.SimulatedSystems;
import org.apache.cassandra.simulator.systems.SimulatedBallots;
import org.apache.cassandra.simulator.systems.SimulatedExecution;
import org.apache.cassandra.simulator.systems.SimulatedFailureDetector;
import org.apache.cassandra.simulator.systems.SimulatedMessageDelivery;
import org.apache.cassandra.simulator.systems.SimulatedSnitch;
import org.apache.cassandra.simulator.systems.SimulatedTime;
import org.apache.cassandra.simulator.utils.ChanceRange;
import org.apache.cassandra.simulator.utils.IntRange;
import org.apache.cassandra.simulator.utils.KindOfSequence;
import org.apache.cassandra.simulator.utils.LongRange;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.Closeable;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.memory.BufferPool;
import org.apache.cassandra.utils.memory.BufferPools;
import org.apache.cassandra.utils.memory.HeapPool;
import static java.lang.Integer.min;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.distributed.impl.AbstractCluster.getSharedClassPredicate;
import static org.apache.cassandra.simulator.SimulatorUtils.failWithOOM;
import static org.apache.cassandra.utils.Shared.Scope.ANY;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
/**
* Wraps a Cluster and a Simulation to run upon it
*/
@SuppressWarnings("RedundantCast")
public class ClusterSimulation<S extends Simulation> implements AutoCloseable
{
public static final Class<?>[] SHARE = new Class[]
{
AsyncFunction.class,
FutureCallback.class,
io.netty.util.concurrent.GenericFutureListener.class,
io.netty.channel.FileRegion.class,
io.netty.util.ReferenceCounted.class
};
public static final Class<?>[] ISOLATE = new Class<?>[0];
public interface SimulationFactory<S extends Simulation>
{
S create(SimulatedSystems simulated, RunnableActionScheduler scheduler, Cluster cluster, ClusterActions.Options options);
}
public interface SchedulerFactory
{
RunnableActionScheduler create(RandomSource random);
}
@SuppressWarnings("UnusedReturnValue")
public static abstract class Builder<S extends Simulation>
{
protected Supplier<RandomSource> randomSupplier = RandomSource.Default::new;
protected int uniqueNum = 0;
protected int threadCount;
protected int concurrency = 10;
protected IntRange nodeCount = new IntRange(4, 16), dcCount = new IntRange(1, 2),
primaryKeySeconds = new IntRange(5, 30), withinKeyConcurrency = new IntRange(2, 5);
protected TopologyChange[] topologyChanges = TopologyChange.values();
protected int topologyChangeLimit = -1;
protected int primaryKeyCount;
protected int secondsToSimulate;
protected ChanceRange normalNetworkDropChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0f, 0.001f),
normalNetworkDelayChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.01f, 0.1f),
flakyNetworkDropChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.01f, 0.1f),
flakyNetworkDelayChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.01f, 0.1f),
networkPartitionChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.0f, 0.1f),
networkFlakyChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.0f, 0.1f),
monitorDelayChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.01f, 0.1f),
schedulerDelayChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.01f, 0.1f),
timeoutChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.01f, 0.1f),
readChance = new ChanceRange(RandomSource::uniformFloat, 0.05f, 0.95f),
nemesisChance = new ChanceRange(randomSource -> randomSource.qlog2uniformFloat(4), 0.001f, 0.01f);
protected LongRange normalNetworkLatencyNanos = new LongRange(1, 2, MILLISECONDS, NANOSECONDS),
normalNetworkDelayNanos = new LongRange(2, 100, MILLISECONDS, NANOSECONDS),
flakyNetworkLatencyNanos = new LongRange(2, 100, MILLISECONDS, NANOSECONDS),
flakyNetworkDelayNanos = new LongRange(2, 100, MILLISECONDS, NANOSECONDS),
networkReconfigureInterval = new LongRange(50, 5000, MICROSECONDS, NANOSECONDS),
schedulerJitterNanos = new LongRange(100, 2000, MICROSECONDS, NANOSECONDS),
schedulerDelayNanos = new LongRange(0, 50, MICROSECONDS, NANOSECONDS),
schedulerLongDelayNanos = new LongRange(50, 5000, MICROSECONDS, NANOSECONDS),
clockDriftNanos = new LongRange(1, 5000, MILLISECONDS, NANOSECONDS),
clockDiscontinuitIntervalNanos = new LongRange(10, 60, SECONDS, NANOSECONDS),
topologyChangeIntervalNanos = new LongRange(5, 15, SECONDS, NANOSECONDS);
protected long contentionTimeoutNanos = MILLISECONDS.toNanos(500L),
writeTimeoutNanos = SECONDS.toNanos(1L),
readTimeoutNanos = SECONDS.toNanos(2L),
requestTimeoutNanos = SECONDS.toNanos(2L);
protected SchedulerFactory schedulerFactory = schedulerFactory(RunnableActionScheduler.Kind.values());
protected Debug debug = new Debug();
protected Capture capture = new Capture(false, false, false);
protected HeapPool.Logged.Listener memoryListener;
protected SimulatedTime.Listener timeListener = (i1, i2) -> {};
protected LongConsumer onThreadLocalRandomCheck;
public Debug debug()
{
return debug;
}
public Builder<S> debug(EnumMap<Debug.Info, Debug.Levels> debug, int[] primaryKeys)
{
this.debug = new Debug(debug, primaryKeys);
return this;
}
public Builder<S> unique(int num)
{
this.uniqueNum = num;
return this;
}
public Builder<S> threadCount(int count)
{
this.threadCount = count;
return this;
}
public Builder<S> nodes(IntRange range)
{
this.nodeCount = range;
return this;
}
public Builder<S> nodes(int min, int max)
{
this.nodeCount = new IntRange(min, max);
return this;
}
public Builder<S> dcs(IntRange range)
{
this.dcCount = range;
return this;
}
public Builder<S> dcs(int min, int max)
{
this.dcCount = new IntRange(min, max);
return this;
}
public Builder<S> concurrency(int concurrency)
{
this.concurrency = concurrency;
return this;
}
public IntRange primaryKeySeconds()
{
return primaryKeySeconds;
}
public Builder<S> primaryKeySeconds(IntRange range)
{
this.primaryKeySeconds = range;
return this;
}
public Builder<S> withinKeyConcurrency(IntRange range)
{
this.withinKeyConcurrency = range;
return this;
}
public Builder<S> withinKeyConcurrency(int min, int max)
{
this.withinKeyConcurrency = new IntRange(min, max);
return this;
}
public Builder<S> topologyChanges(TopologyChange[] topologyChanges)
{
this.topologyChanges = topologyChanges;
return this;
}
public Builder<S> topologyChangeIntervalNanos(LongRange topologyChangeIntervalNanos)
{
this.topologyChangeIntervalNanos = topologyChangeIntervalNanos;
return this;
}
public Builder<S> topologyChangeLimit(int topologyChangeLimit)
{
this.topologyChangeLimit = topologyChangeLimit;
return this;
}
public int primaryKeyCount()
{
return primaryKeyCount;
}
public Builder<S> primaryKeyCount(int count)
{
this.primaryKeyCount = count;
return this;
}
public int secondsToSimulate()
{
return secondsToSimulate;
}
public Builder<S> secondsToSimulate(int seconds)
{
this.secondsToSimulate = seconds;
return this;
}
public Builder<S> networkPartitionChance(ChanceRange partitionChance)
{
this.networkPartitionChance = partitionChance;
return this;
}
public Builder<S> networkFlakyChance(ChanceRange flakyChance)
{
this.networkFlakyChance = flakyChance;
return this;
}
public Builder<S> networkReconfigureInterval(LongRange reconfigureIntervalNanos)
{
this.networkReconfigureInterval = reconfigureIntervalNanos;
return this;
}
public Builder<S> networkDropChance(ChanceRange dropChance)
{
this.normalNetworkDropChance = dropChance;
return this;
}
public Builder<S> networkDelayChance(ChanceRange delayChance)
{
this.normalNetworkDelayChance = delayChance;
return this;
}
public Builder<S> networkLatencyNanos(LongRange networkLatencyNanos)
{
this.normalNetworkLatencyNanos = networkLatencyNanos;
return this;
}
public Builder<S> networkDelayNanos(LongRange networkDelayNanos)
{
this.normalNetworkDelayNanos = networkDelayNanos;
return this;
}
public Builder<S> flakyNetworkDropChance(ChanceRange dropChance)
{
this.flakyNetworkDropChance = dropChance;
return this;
}
public Builder<S> flakyNetworkDelayChance(ChanceRange delayChance)
{
this.flakyNetworkDelayChance = delayChance;
return this;
}
public Builder<S> flakyNetworkLatencyNanos(LongRange networkLatencyNanos)
{
this.flakyNetworkLatencyNanos = networkLatencyNanos;
return this;
}
public Builder<S> flakyNetworkDelayNanos(LongRange networkDelayNanos)
{
this.flakyNetworkDelayNanos = networkDelayNanos;
return this;
}
public Builder<S> clockDriftNanos(LongRange clockDriftNanos)
{
this.clockDriftNanos = clockDriftNanos;
return this;
}
public Builder<S> clockDiscontinuityIntervalNanos(LongRange clockDiscontinuityIntervalNanos)
{
this.clockDiscontinuitIntervalNanos = clockDiscontinuityIntervalNanos;
return this;
}
public Builder<S> schedulerDelayChance(ChanceRange delayChance)
{
this.schedulerDelayChance = delayChance;
return this;
}
public Builder<S> schedulerJitterNanos(LongRange schedulerJitterNanos)
{
this.schedulerJitterNanos = schedulerJitterNanos;
return this;
}
public LongRange schedulerJitterNanos()
{
return schedulerJitterNanos;
}
public Builder<S> schedulerDelayNanos(LongRange schedulerDelayNanos)
{
this.schedulerDelayNanos = schedulerDelayNanos;
return this;
}
public Builder<S> schedulerLongDelayNanos(LongRange schedulerLongDelayNanos)
{
this.schedulerLongDelayNanos = schedulerLongDelayNanos;
return this;
}
public Builder<S> timeoutChance(ChanceRange timeoutChance)
{
this.timeoutChance = timeoutChance;
return this;
}
public ChanceRange readChance()
{
return readChance;
}
public IntRange withinKeyConcurrency()
{
return withinKeyConcurrency;
}
public int concurrency()
{
return concurrency;
}
public Builder<S> readChance(ChanceRange readChance)
{
this.readChance = readChance;
return this;
}
public Builder<S> nemesisChance(ChanceRange nemesisChance)
{
this.nemesisChance = nemesisChance;
return this;
}
public Builder<S> scheduler(RunnableActionScheduler.Kind... kinds)
{
this.schedulerFactory = schedulerFactory(kinds);
return this;
}
public SimulatedFutureActionScheduler futureActionScheduler(int nodeCount, SimulatedTime time, RandomSource random)
{
KindOfSequence kind = Choices.random(random, KindOfSequence.values())
.choose(random);
return new SimulatedFutureActionScheduler(kind, nodeCount, random, time,
new NetworkConfig(new PhaseConfig(normalNetworkDropChance, normalNetworkDelayChance, normalNetworkLatencyNanos, normalNetworkDelayNanos),
new PhaseConfig(flakyNetworkDropChance, flakyNetworkDelayChance, flakyNetworkLatencyNanos, flakyNetworkDelayNanos),
networkPartitionChance, networkFlakyChance, networkReconfigureInterval),
new SchedulerConfig(schedulerDelayChance, schedulerDelayNanos, schedulerLongDelayNanos));
}
static SchedulerFactory schedulerFactory(RunnableActionScheduler.Kind... kinds)
{
return (random) -> {
switch (Choices.random(random, kinds).choose(random))
{
default: throw new AssertionError();
case SEQUENTIAL: return new RunnableActionScheduler.Sequential();
case UNIFORM: return new RunnableActionScheduler.RandomUniform(random);
case RANDOM_WALK: return new RunnableActionScheduler.RandomWalk(random);
}
};
}
public Builder<S> scheduler(SchedulerFactory schedulerFactory)
{
this.schedulerFactory = schedulerFactory;
return this;
}
public Builder<S> random(Supplier<RandomSource> randomSupplier)
{
this.randomSupplier = randomSupplier;
return this;
}
public Builder<S> memoryListener(HeapPool.Logged.Listener memoryListener)
{
this.memoryListener = memoryListener;
return this;
}
public Builder<S> timeListener(SimulatedTime.Listener timeListener)
{
this.timeListener = timeListener;
return this;
}
public Builder<S> capture(Capture capture)
{
this.capture = capture;
return this;
}
public Capture capture()
{
return capture;
}
public Builder<S> onThreadLocalRandomCheck(LongConsumer runnable)
{
this.onThreadLocalRandomCheck = runnable;
return this;
}
public abstract ClusterSimulation<S> create(long seed) throws IOException;
}
static class ThreadAllocator
{
final RandomSource random;
int clusterPool; // number of threads we have left for the whole cluster
int remainingNodes; // number of nodes we still need to allocate them to
int allocationPool; // threads to allocate for the node we're currently processing
int remainingAllocations; // number of _remaining_ allocations take() assumes we want to evenly allocate threads over
public ThreadAllocator(RandomSource random, int threadsToAllocate, int betweenNodes)
{
this.random = random;
this.clusterPool = threadsToAllocate;
this.remainingNodes = betweenNodes;
}
// randomly set the number of threads in various thread pools
IInstanceConfig update(IInstanceConfig config)
{
cycle();
// allocate in ascending order of max, for take() correctness
return config
.set("memtable_flush_writers", take(1, 1, 2))
.set("concurrent_compactors", take(1, 1, 4))
.set("concurrent_writes", take(1, 4))
.set("concurrent_counter_writes", take(1, 4))
.set("concurrent_materialized_view_writes", take(1, 4))
.set("concurrent_reads", take(1, 4))
.forceSet("available_processors", take(3, 4));
}
// begin allocating for a new node
void cycle()
{
assert remainingNodes > 0;
// return unallocated items to the outerPool
clusterPool += allocationPool;
// set the curPool to allocate the next allocationPool size
allocationPool = clusterPool;
remainingAllocations = remainingNodes;
// randomly select the next pool size, subtracting it from the outer pool
allocationPool = take(1, 1);
clusterPool -= allocationPool;
// this is hard-coded to match the sum of the first arguments above
remainingAllocations = 9;
--remainingNodes;
}
/**
* See {@link #take(int, int, int)}
*/
int take(int times, int min)
{
return take(times, min, allocationPool);
}
/**
* Allocate a random number of threads between [min..max)
* The allocation is suitable for multiple users of the value, i.e.
* {@code times} multiple of the result are deducted from the pool.
*
* If there are adequate supplies we aim to allocate threads "equally" between pools,
* selecting a uniform value between 0.5x and 2x the fair split of the remaining pool
* on each allocation. If the min/max bounds override that, they are preferred.
*
* The minimum is always honoured, regardless of available pool size.
*/
int take(int times, int min, int max)
{
int remaining = remainingAllocations;
assert remaining >= times;
remainingAllocations -= times;
if (remaining * min <= allocationPool)
return min;
if (times == remaining)
return allocationPool / remaining;
if (times + 1 == remaining)
return random.uniform(Math.max(min, (allocationPool - max) / times), Math.min(max, (allocationPool - min) / times));
int median = allocationPool / remaining;
min = Math.max(min, Math.min(max, median) / 2);
max = Math.min(max, median * 2);
return min >= max ? min : random.uniform(min, max);
}
}
public final RandomSource random;
public final SimulatedSystems simulated;
public final Cluster cluster;
public final S simulation;
private final FileSystem jimfs;
protected final Map<Integer, List<Closeable>> onUnexpectedShutdown = new TreeMap<>();
protected final List<Callable<Void>> onShutdown = new CopyOnWriteArrayList<>();
protected final ThreadLocalRandomCheck threadLocalRandomCheck;
public ClusterSimulation(RandomSource random, long seed, int uniqueNum,
Builder<?> builder,
Consumer<IInstanceConfig> configUpdater,
SimulationFactory<S> factory) throws IOException
{
this.random = random;
this.jimfs = Jimfs.newFileSystem(Long.toHexString(seed) + 'x' + uniqueNum, Configuration.unix().toBuilder()
.setMaxSize(4L << 30).setBlockSize(512)
.build());
final SimulatedMessageDelivery delivery;
final SimulatedExecution execution;
final SimulatedBallots ballots;
final SimulatedSnitch snitch;
final SimulatedTime time;
final SimulatedFailureDetector failureDetector;
int numOfNodes = builder.nodeCount.select(random);
int numOfDcs = builder.dcCount.select(random, 0, numOfNodes / 4);
int[] numInDcs = new int[numOfDcs];
int[] nodeToDc = new int[numOfNodes];
int[] minRf = new int[numOfDcs], initialRf = new int[numOfDcs], maxRf = new int[numOfDcs];
{
// TODO (feature): split unevenly
int n = 0, nc = 0;
for (int i = 0; i < numOfDcs; ++i)
{
int numInDc = (numOfNodes / numOfDcs) + (numOfNodes % numOfDcs > i ? 1 : 0);
numInDcs[i] = numInDc;
minRf[i] = 3;
maxRf[i] = min(numInDc, 9);
initialRf[i] = random.uniform(minRf[i], 1 + maxRf[i]);
nc += numInDc;
while (n < nc)
nodeToDc[n++] = i;
}
}
snitch = new SimulatedSnitch(nodeToDc, numInDcs);
execution = new SimulatedExecution();
KindOfSequence kindOfDriftSequence = Choices.uniform(KindOfSequence.values()).choose(random);
KindOfSequence kindOfDiscontinuitySequence = Choices.uniform(KindOfSequence.values()).choose(random);
time = new SimulatedTime(numOfNodes, random, 1577836800000L /*Jan 1st UTC*/, builder.clockDriftNanos, kindOfDriftSequence,
kindOfDiscontinuitySequence.period(builder.clockDiscontinuitIntervalNanos, random),
builder.timeListener);
ballots = new SimulatedBallots(random, () -> {
long max = random.uniform(2, 16);
return () -> random.uniform(1, max);
});
Predicate<String> sharedClassPredicate = getSharedClassPredicate(ISOLATE, SHARE, ANY, SIMULATION);
InterceptAsClassTransformer interceptClasses = new InterceptAsClassTransformer(builder.monitorDelayChance.asSupplier(random), builder.nemesisChance.asSupplier(random), NemesisFieldSelectors.get(), ClassLoader.getSystemClassLoader(), sharedClassPredicate.negate());
threadLocalRandomCheck = new ThreadLocalRandomCheck(builder.onThreadLocalRandomCheck);
Failures failures = new Failures();
ThreadAllocator threadAllocator = new ThreadAllocator(random, builder.threadCount, numOfNodes);
cluster = snitch.setup(Cluster.build(numOfNodes)
.withRoot(jimfs.getPath("/cassandra"))
.withSharedClasses(sharedClassPredicate)
.withConfig(config -> configUpdater.accept(threadAllocator.update(config
.with(Feature.BLANK_GOSSIP)
.set("read_request_timeout", String.format("%dms", NANOSECONDS.toMillis(builder.readTimeoutNanos)))
.set("write_request_timeout", String.format("%dms", NANOSECONDS.toMillis(builder.writeTimeoutNanos)))
.set("cas_contention_timeout", String.format("%dms", NANOSECONDS.toMillis(builder.contentionTimeoutNanos)))
.set("request_timeout", String.format("%dms", NANOSECONDS.toMillis(builder.requestTimeoutNanos)))
.set("memtable_heap_space", "1MiB")
.set("memtable_allocation_type", builder.memoryListener != null ? "unslabbed_heap_buffers_logged" : "heap_buffers")
.set("file_cache_size", "16MiB")
.set("use_deterministic_table_id", true)
.set("disk_access_mode", "standard")
.set("failure_detector", SimulatedFailureDetector.Instance.class.getName())
.set("commitlog_compression", new ParameterizedClass(LZ4Compressor.class.getName(), emptyMap()))
)))
.withInstanceInitializer(new IInstanceInitializer()
{
@Override
public void initialise(ClassLoader classLoader, ThreadGroup threadGroup, int num, int generation)
{
List<Closeable> onShutdown = new ArrayList<>();
InterceptorOfGlobalMethods interceptorOfGlobalMethods = IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableQuadFunction<Capture, LongConsumer, Consumer<Throwable>, RandomSource, InterceptorOfGlobalMethods>) InterceptingGlobalMethods::new, classLoader)
.apply(builder.capture, builder.onThreadLocalRandomCheck, failures, random);
onShutdown.add(interceptorOfGlobalMethods);
InterceptingExecutorFactory factory = execution.factory(interceptorOfGlobalMethods, classLoader, threadGroup);
IsolatedExecutor.transferAdhoc((SerializableConsumer<ExecutorFactory>) ExecutorFactory.Global::unsafeSet, classLoader)
.accept(factory);
onShutdown.add(factory);
IsolatedExecutor.transferAdhoc((SerializableBiConsumer<InterceptorOfGlobalMethods, IntSupplier>) InterceptorOfGlobalMethods.Global::unsafeSet, classLoader)
.accept(interceptorOfGlobalMethods, () -> {
if (InterceptibleThread.isDeterministic())
throw failWithOOM();
return random.uniform(Integer.MIN_VALUE, Integer.MAX_VALUE);
});
onShutdown.add(IsolatedExecutor.transferAdhoc((SerializableRunnable)InterceptorOfGlobalMethods.Global::unsafeReset, classLoader)::run);
onShutdown.add(time.setup(num, classLoader));
onUnexpectedShutdown.put(num, onShutdown);
}
@Override
public void beforeStartup(IInstance i)
{
((IInvokableInstance) i).unsafeAcceptOnThisThread(FBUtilities::setAvailableProcessors, i.config().getInt("available_processors"));
((IInvokableInstance) i).unsafeAcceptOnThisThread(IfInterceptibleThread::setThreadLocalRandomCheck, (LongConsumer) threadLocalRandomCheck);
int num = i.config().num();
if (builder.memoryListener != null)
{
((IInvokableInstance) i).unsafeAcceptOnThisThread(HeapPool.Logged::setListener, builder.memoryListener);
onUnexpectedShutdown.get(num).add(() -> ((IInvokableInstance) i).unsafeAcceptOnThisThread(HeapPool.Logged::setListener, (ignore1, ignore2) -> {}));
}
((IInvokableInstance) i).unsafeAcceptOnThisThread(PaxosPrepare::setOnLinearizabilityViolation, SimulatorUtils::failWithOOM);
onUnexpectedShutdown.get(num).add(() -> ((IInvokableInstance) i).unsafeRunOnThisThread(() -> PaxosPrepare.setOnLinearizabilityViolation(null)));
}
@Override
public void afterStartup(IInstance i)
{
int num = i.config().num();
((IInvokableInstance) i).unsafeAcceptOnThisThread(BallotGenerator.Global::unsafeSet, (BallotGenerator) ballots.get());
onUnexpectedShutdown.get(num).add(() -> ((IInvokableInstance) i).unsafeRunOnThisThread(() -> BallotGenerator.Global.unsafeSet(new BallotGenerator.Default())));
((IInvokableInstance) i).unsafeAcceptOnThisThread((SerializableConsumer<BufferPool.DebugLeaks>) debug -> BufferPools.forChunkCache().debug(null, debug), failures);
onUnexpectedShutdown.get(num).add(() -> ((IInvokableInstance) i).unsafeRunOnThisThread(() -> BufferPools.forChunkCache().debug(null, null)));
((IInvokableInstance) i).unsafeAcceptOnThisThread((SerializableConsumer<BufferPool.DebugLeaks>) debug -> BufferPools.forNetworking().debug(null, debug), failures);
onUnexpectedShutdown.get(num).add(() -> ((IInvokableInstance) i).unsafeRunOnThisThread(() -> BufferPools.forNetworking().debug(null, null)));
((IInvokableInstance) i).unsafeAcceptOnThisThread((SerializableConsumer<Ref.OnLeak>) Ref::setOnLeak, failures);
onUnexpectedShutdown.get(num).add(() -> ((IInvokableInstance) i).unsafeRunOnThisThread(() -> Ref.setOnLeak(null)));
}
}).withClassTransformer(interceptClasses)
.withShutdownExecutor((name, classLoader, shuttingDown, call) -> {
onShutdown.add(call);
return null;
})
).createWithoutStarting();
IfInterceptibleThread.setThreadLocalRandomCheck(threadLocalRandomCheck);
snitch.setup(cluster);
DirectStreamingConnectionFactory.setup(cluster);
delivery = new SimulatedMessageDelivery(cluster);
failureDetector = new SimulatedFailureDetector(cluster);
SimulatedFutureActionScheduler futureActionScheduler = builder.futureActionScheduler(numOfNodes, time, random);
simulated = new SimulatedSystems(random, time, delivery, execution, ballots, failureDetector, snitch, futureActionScheduler, builder.debug, failures);
simulated.register(futureActionScheduler);
RunnableActionScheduler scheduler = builder.schedulerFactory.create(random);
ClusterActions.Options options = new ClusterActions.Options(builder.topologyChangeLimit, Choices.uniform(KindOfSequence.values()).choose(random).period(builder.topologyChangeIntervalNanos, random),
Choices.random(random, builder.topologyChanges),
minRf, initialRf, maxRf, null);
simulation = factory.create(simulated, scheduler, cluster, options);
}
public synchronized void close() throws IOException
{
// Re-enable time on shutdown
try
{
Field field = Clock.Global.class.getDeclaredField("instance");
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(null, new Clock.Default());
}
catch (NoSuchFieldException|IllegalAccessException e)
{
throw new RuntimeException(e);
}
threadLocalRandomCheck.stop();
simulated.execution.forceStop();
SimulatedTime.Global.disable();
Throwable fail = null;
for (int num = 1 ; num <= cluster.size() ; ++num)
{
if (!cluster.get(num).isShutdown())
{
fail = Throwables.close(fail, onUnexpectedShutdown.get(num));
}
}
try
{
simulation.close();
}
catch (Throwable t)
{
fail = t;
}
try
{
cluster.close();
}
catch (Throwable t)
{
fail = Throwables.merge(fail, t);
}
for (Callable<Void> call : onShutdown)
{
try
{
call.call();
}
catch (Throwable t)
{
fail = Throwables.merge(fail, t);
}
}
Throwables.maybeFail(fail, IOException.class);
}
}