blob: 4cceb8fa3ef0cd6f0c3355995702c59e5b92109a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator.test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Predicate;
import com.google.common.collect.Iterators;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.impl.IsolatedExecutor;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
import org.apache.cassandra.simulator.Action;
import org.apache.cassandra.simulator.ActionList;
import org.apache.cassandra.simulator.ActionPlan;
import org.apache.cassandra.simulator.ActionSchedule;
import org.apache.cassandra.simulator.ActionSchedule.Work;
import org.apache.cassandra.simulator.FutureActionScheduler;
import org.apache.cassandra.simulator.RunnableActionScheduler;
import org.apache.cassandra.simulator.ClusterSimulation;
import org.apache.cassandra.simulator.Debug;
import org.apache.cassandra.simulator.RandomSource;
import org.apache.cassandra.simulator.Simulation;
import org.apache.cassandra.simulator.SimulationRunner;
import org.apache.cassandra.simulator.asm.InterceptClasses;
import org.apache.cassandra.simulator.asm.NemesisFieldSelectors;
import org.apache.cassandra.simulator.systems.Failures;
import org.apache.cassandra.simulator.systems.InterceptibleThread;
import org.apache.cassandra.simulator.systems.InterceptingExecutorFactory;
import org.apache.cassandra.simulator.systems.InterceptingGlobalMethods;
import org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods;
import org.apache.cassandra.simulator.systems.SimulatedExecution;
import org.apache.cassandra.simulator.systems.SimulatedQuery;
import org.apache.cassandra.simulator.systems.SimulatedSystems;
import org.apache.cassandra.simulator.systems.SimulatedTime;
import org.apache.cassandra.simulator.utils.LongRange;
import org.apache.cassandra.utils.CloseableIterator;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.simulator.ActionSchedule.Mode.STREAM_LIMITED;
import static org.apache.cassandra.simulator.ActionSchedule.Mode.UNLIMITED;
import static org.apache.cassandra.simulator.ClusterSimulation.ISOLATE;
import static org.apache.cassandra.simulator.ClusterSimulation.SHARE;
import static org.apache.cassandra.simulator.SimulatorUtils.failWithOOM;
import static org.apache.cassandra.simulator.systems.InterceptedWait.CaptureSites.Capture.NONE;
import static org.apache.cassandra.simulator.utils.KindOfSequence.UNIFORM;
import static org.apache.cassandra.utils.Shared.Scope.ANY;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
public class SimulationTestBase
{
static abstract class DTestClusterSimulation implements Simulation
{
final SimulatedSystems simulated;
final RunnableActionScheduler scheduler;
final Cluster cluster;
public DTestClusterSimulation(SimulatedSystems simulated, RunnableActionScheduler scheduler, Cluster cluster)
{
this.simulated = simulated;
this.scheduler = scheduler;
this.cluster = cluster;
}
public Action executeQuery(int node, String query, ConsistencyLevel cl, Object... bindings)
{
return new SimulatedQuery(String.format("Execute query: %s %s %s", query, cl, Arrays.toString(bindings)),
simulated,
cluster.get(node),
query,
cl,
null,
bindings);
}
public Action schemaChange(int node, String query)
{
return new SimulatedQuery(String.format("Schema change: %s", query),
simulated,
cluster.get(node),
query,
org.apache.cassandra.distributed.api.ConsistencyLevel.ALL,
null);
}
protected abstract ActionList initialize();
protected abstract ActionList execute();
public CloseableIterator<?> iterator()
{
return new ActionPlan(ActionList.of(initialize()),
Collections.singletonList(execute()),
ActionList.empty())
.iterator(STREAM_LIMITED, -1, () -> 0L, simulated.time, scheduler, simulated.futureScheduler);
}
public void run()
{
try (CloseableIterator<?> iter = iterator())
{
while (iter.hasNext())
iter.next();
}
}
public void close() throws Exception
{
}
}
public static void simulate(Function<DTestClusterSimulation, ActionList> init,
Function<DTestClusterSimulation, ActionList> test,
Consumer<ClusterSimulation.Builder<DTestClusterSimulation>> configure) throws IOException
{
SimulationRunner.beforeAll();
long seed = System.currentTimeMillis();
RandomSource random = new RandomSource.Default();
random.reset(seed);
class Factory extends ClusterSimulation.Builder<DTestClusterSimulation>
{
public ClusterSimulation<DTestClusterSimulation> create(long seed) throws IOException
{
return new ClusterSimulation<>(random, seed, 1, this,
(c) -> {},
(simulated, scheduler, cluster, options) -> new DTestClusterSimulation(simulated, scheduler, cluster) {
protected ActionList initialize()
{
return init.apply(this);
}
protected ActionList execute()
{
return test.apply(this);
}
});
}
}
Factory factory = new Factory();
configure.accept(factory);
try (ClusterSimulation<?> cluster = factory.create(seed))
{
try
{
cluster.simulation.run();
}
catch (Throwable t)
{
throw new AssertionError(String.format("Failed on seed %s", Long.toHexString(seed)),
t);
}
}
}
public static void simulate(IIsolatedExecutor.SerializableRunnable run,
IIsolatedExecutor.SerializableRunnable check)
{
simulate(new IIsolatedExecutor.SerializableRunnable[]{run},
check);
}
public static void simulate(IIsolatedExecutor.SerializableRunnable[] runnables,
IIsolatedExecutor.SerializableRunnable check)
{
Failures failures = new Failures();
RandomSource random = new RandomSource.Default();
long seed = System.currentTimeMillis();
System.out.println("Using seed: " + seed);
random.reset(seed);
SimulatedTime time = new SimulatedTime(1, random, 1577836800000L /*Jan 1st UTC*/, new LongRange(1, 100, MILLISECONDS, NANOSECONDS),
UNIFORM, UNIFORM.period(new LongRange(10L, 60L, SECONDS, NANOSECONDS), random), (i1, i2) -> {});
SimulatedExecution execution = new SimulatedExecution();
Predicate<String> sharedClassPredicate = AbstractCluster.getSharedClassPredicate(ISOLATE, SHARE, ANY, SIMULATION);
InstanceClassLoader classLoader = new InstanceClassLoader(1, 1, AbstractCluster.CURRENT_VERSION.classpath,
Thread.currentThread().getContextClassLoader(),
sharedClassPredicate,
new InterceptClasses(() -> 1.0f, () -> 1.0f, NemesisFieldSelectors.get(), ClassLoader.getSystemClassLoader(), sharedClassPredicate.negate())::apply);
ThreadGroup tg = new ThreadGroup("test");
InterceptorOfGlobalMethods interceptorOfGlobalMethods = new InterceptingGlobalMethods(NONE, null, failures, random);
InterceptingExecutorFactory factory = execution.factory(interceptorOfGlobalMethods, classLoader, tg);
time.setup(1, classLoader);
IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableConsumer<ExecutorFactory>) ExecutorFactory.Global::unsafeSet, classLoader)
.accept(factory);
IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableBiConsumer<InterceptorOfGlobalMethods, IntSupplier>) InterceptorOfGlobalMethods.Global::unsafeSet, classLoader)
.accept(interceptorOfGlobalMethods, () -> {
if (InterceptibleThread.isDeterministic())
throw failWithOOM();
return random.uniform(Integer.MIN_VALUE, Integer.MAX_VALUE);
});
SimulatedSystems simulated = new SimulatedSystems(random, time, null, execution, null, null, null, new FutureActionScheduler()
{
@Override
public Deliver shouldDeliver(int from, int to)
{
return Deliver.DELIVER;
}
@Override
public long messageDeadlineNanos(int from, int to)
{
return 0;
}
@Override
public long messageTimeoutNanos(long expiresAfterNanos, long expirationIntervalNanos)
{
return 0;
}
@Override
public long messageFailureNanos(int from, int to)
{
return 0;
}
@Override
public long schedulerDelayNanos()
{
return 0;
}
}, new Debug(), failures);
RunnableActionScheduler runnableScheduler = new RunnableActionScheduler.RandomUniform(random);
Action entrypoint = new Action("entrypoint", Action.Modifiers.NONE, Action.Modifiers.NONE)
{
protected ActionList performSimple()
{
Action[] actions = new Action[runnables.length];
for (int i = 0; i < runnables.length; i++)
actions[i] = toAction(runnables[i], classLoader, factory, simulated);
return ActionList.of(actions);
}
};
ActionSchedule testSchedule = new ActionSchedule(simulated.time, simulated.futureScheduler, () -> 0, runnableScheduler, new Work(UNLIMITED, Collections.singletonList(ActionList.of(entrypoint))));
Iterators.advance(testSchedule, Integer.MAX_VALUE);
ActionSchedule checkSchedule = new ActionSchedule(simulated.time, simulated.futureScheduler, () -> 0, runnableScheduler, new Work(UNLIMITED, Collections.singletonList(ActionList.of(toAction(check, classLoader, factory, simulated)))));
Iterators.advance(checkSchedule, Integer.MAX_VALUE);
}
public static Action toAction(IIsolatedExecutor.SerializableRunnable r, ClassLoader classLoader, InterceptingExecutorFactory factory, SimulatedSystems simulated)
{
Runnable runnable = IsolatedExecutor.transferAdhoc(r, classLoader);
return simulated.invoke("action", Action.Modifiers.NONE, Action.Modifiers.NONE,
factory.startParked("begin", runnable));
}
public static <T> T[] arr(T... arr)
{
return arr;
}
}