CASSANDRA-17013: CEP-10 Simulator Improvements
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IClassTransformer.java b/src/main/java/org/apache/cassandra/distributed/api/IClassTransformer.java
new file mode 100644
index 0000000..fe718cd
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IClassTransformer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.distributed.api;
+
+public interface IClassTransformer {
+
+    /**
+     * Modify the bytecode of the provided class. Provides the original bytecode and the fully qualified name of the class.
+     * Note, bytecode may be null indicating the class definition could not be found. In this case a synthetic definition
+     * may be returned, or null.
+     */
+    byte[] transform(String name, byte[] bytecode);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
index 3fc056e..4af4ae5 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -56,6 +56,15 @@
 
     IMessageFilters filters();
 
+    default void setMessageSink(IMessageSink messageSink) { throw new UnsupportedOperationException(); }
+
+    default void deliverMessage(InetSocketAddress to, IMessage msg)
+    {
+        IInstance toInstance = get(to);
+        if (toInstance != null)
+            toInstance.receiveMessage(msg);
+    }
+
     /**
      * dynamically sets the current uncaught exceptions filter
      *
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
index 8ac6235..2ff0842 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -21,6 +21,7 @@
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 
 import org.apache.cassandra.distributed.shared.Metrics;
@@ -83,9 +84,12 @@
 
     // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
     void startup(ICluster cluster);
+    default void postStartup() {}
 
     void receiveMessage(IMessage message);
 
+    void receiveMessageWithInvokingThread(IMessage message);
+
     int getMessagingVersion();
 
     void setMessagingVersion(InetSocketAddress addressAndPort, int version);
@@ -96,6 +100,8 @@
 
     void forceCompact(String keyspace, String table);
 
+    default Executor executorFor(int verb) { throw new UnsupportedOperationException(); }
+
     default boolean getLogsEnabled()
     {
         try
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
index ba1a09b..f88e761 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -61,6 +61,8 @@
 
     IInstanceConfig set(String fieldName, Object value);
 
+    default IInstanceConfig forceSet(String fieldName, Object value) { throw new UnsupportedOperationException(); }
+
     Object get(String fieldName);
 
     String getString(String fieldName);
@@ -69,7 +71,9 @@
 
     boolean has(Feature featureFlag);
 
-    public IInstanceConfig forVersion(Semver series);
+    IInstanceConfig forVersion(Semver series);
+
+    Map<String, Object> getParams();
 
     public static class ParameterizedClass
     {
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceInitializer.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceInitializer.java
new file mode 100644
index 0000000..5acc067
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceInitializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface IInstanceInitializer
+{
+    default void initialise(ClassLoader classLoader, int num) { throw new UnsupportedOperationException(); }
+    void initialise(ClassLoader classLoader, ThreadGroup threadGroup, int num, int generation);
+    default void beforeStartup(IInstance instance) {}
+    default void afterStartup(IInstance instance) {}
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
index dff5a62..29b0ea2 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
@@ -47,19 +47,115 @@
 
     default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
     default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
+    default <I1> void acceptOnInstance(SerializableConsumer<I1> consumer, I1 i1) { acceptsOnInstance(consumer).accept(i1); }
 
     default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
     default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
+    default <I1, I2> void acceptOnInstance(SerializableBiConsumer<I1, I2> consumer, I1 i1, I2 i2) { acceptsOnInstance(consumer).accept(i1, i2); }
+
+    default <I1, I2, I3> TriFunction<I1, I2, I3, Future<?>> asyncAcceptsOnInstance(SerializableTriConsumer<I1, I2, I3> consumer) { return async(transfer(consumer)); }
+    default <I1, I2, I3> TriConsumer<I1, I2, I3> acceptsOnInstance(SerializableTriConsumer<I1, I2, I3> consumer) { return sync(transfer(consumer)); }
+    default <I1, I2, I3> void acceptOnInstance(SerializableTriConsumer<I1, I2, I3> consumer, I1 i1, I2 i2, I3 i3) { acceptsOnInstance(consumer).accept(i1, i2, i3); }
 
     default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
     default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
+    default <I1, O> O applyOnInstance(SerializableFunction<I1, O> f, I1 i1) { return sync(transfer(f)).apply(i1); }
 
     default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
     default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
+    default <I1, I2, O> O applyOnInstance(SerializableBiFunction<I1, I2, O> f, I1 i1, I2 i2) { return sync(transfer(f)).apply(i1, i2); }
 
     default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
     default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
+    default <I1, I2, I3, O> O applyOnInstance(SerializableTriFunction<I1, I2, I3, O> f, I1 i1, I2 i2, I3 i3) { return sync(transfer(f)).apply(i1, i2, i3); }
+
+    default <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, Future<O>> asyncAppliesOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f) { return async(transfer(f)); }
+    default <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, O> appliesOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f) { return sync(transfer(f)); }
+    default <I1, I2, I3, I4, O> O applyOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f, I1 i1, I2 i2, I3 i3, I4 i4) { return sync(transfer(f)).apply(i1, i2, i3, i4); }
+
+    default <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, Future<O>> asyncAppliesOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f) { return async(transfer(f)); }
+    default <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, O> appliesOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f) { return sync(transfer(f)); }
+    default <I1, I2, I3, I4, I5, O> O applyOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f, I1 i1, I2 i2, I3 i3, I4 i4, I5 i5) { return sync(transfer(f)).apply(i1, i2, i3, i4, i5); }
+
+    /**
+     * {@link #runOnInstance(SerializableRunnable)} on the invoking thread
+     */
+    default void unsafeRunOnThisThread(IIsolatedExecutor.SerializableRunnable invoke)
+    {
+        transfer(invoke).run();
+    }
+
+    /**
+     * {@link #callOnInstance(SerializableCallable)} on the invoking thread
+     */
+    default <O> O unsafeCallOnThisThread(IIsolatedExecutor.SerializableCallable<O> invoke)
+    {
+        return transfer(invoke).call();
+    }
+
+    /**
+     * {@link #acceptOnInstance(SerializableConsumer, Object)} on the invoking thread
+     */
+    default <I> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableConsumer<I> apply, I i1)
+    {
+        transfer(apply).accept(i1);
+    }
+
+    /**
+     * {@link #acceptOnInstance(SerializableBiConsumer, Object, Object)} on the invoking thread
+     */
+    default <I1, I2> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableBiConsumer<I1, I2> apply, I1 i1, I2 i2)
+    {
+        transfer(apply).accept(i1, i2);
+    }
+
+    /**
+     * {@link #acceptOnInstance(SerializableBiConsumer, Object, Object)} on the invoking thread
+     */
+    default <I1, I2, I3> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableTriConsumer<I1, I2, I3> apply, I1 i1, I2 i2, I3 i3)
+    {
+        transfer(apply).accept(i1, i2, i3);
+    }
+
+    /**
+     * {@link #applyOnInstance(SerializableFunction, Object)} on the invoking thread
+     */
+    default <I, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableFunction<I, O> apply, I i1)
+    {
+        return transfer(apply).apply(i1);
+    }
+
+    /**
+     * {@link #applyOnInstance(SerializableBiFunction, Object, Object)} on the invoking thread
+     */
+    default <I1, I2, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableBiFunction<I1, I2, O> apply, I1 i1, I2 i2)
+    {
+        return transfer(apply).apply(i1, i2);
+    }
+
+    /**
+     * {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
+     */
+    default <I1, I2, I3, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableTriFunction<I1, I2, I3, O> apply, I1 i1, I2 i2, I3 i3)
+    {
+        return transfer(apply).apply(i1, i2, i3);
+    }
+
+    /**
+     * {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
+     */
+    default <I1, I2, I3, I4, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableQuadFunction<I1, I2, I3, I4, O> apply, I1 i1, I2 i2, I3 i3, I4 i4)
+    {
+        return transfer(apply).apply(i1, i2, i3, i4);
+    }
+
+    /**
+     * {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
+     */
+    default <I1, I2, I3, I4, I5, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableQuintFunction<I1, I2, I3, I4, I5, O> apply, I1 i1, I2 i2, I3 i3, I4 i4, I5 i5)
+    {
+        return transfer(apply).apply(i1, i2, i3, i4, i5);
+    }
 
     <E extends Serializable> E transfer(E object);
-
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
index c8a66e4..b49ea94 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
@@ -20,6 +20,8 @@
 
 import java.io.Serializable;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
@@ -41,17 +43,33 @@
 {
     interface CallableNoExcept<O> extends Callable<O> { O call(); }
 
-    interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable {}
     interface SerializableRunnable extends Runnable, Serializable {}
-    interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
+
     interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
+    interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable {}
+
+    interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
     interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
+    interface TriConsumer<I1, I2, I3> { void accept(I1 i1, I2 i2, I3 i3); }
+    interface SerializableTriConsumer<I1, I2, I3> extends TriConsumer<I1, I2, I3>, Serializable { }
+
+    interface DynamicFunction<IO>
+    {
+        <IO2 extends IO> IO2 apply(IO2 i);
+    }
+    interface SerializableDynamicFunction<IO> extends DynamicFunction<IO>, Serializable {}
+
     interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
     interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
-
     interface TriFunction<I1, I2, I3, O> { O apply(I1 i1, I2 i2, I3 i3); }
-
     interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> {}
+    interface QuadFunction<I1, I2, I3, I4, O> { O apply(I1 i1, I2 i2, I3 i3, I4 i4); }
+    interface SerializableQuadFunction<I1, I2, I3, I4, O> extends Serializable, QuadFunction<I1, I2, I3, I4, O> {}
+    interface QuintFunction<I1, I2, I3, I4, I5, O> { O apply(I1 i1, I2 i2, I3 i3, I4 i4, I5 i5); }
+    interface SerializableQuintFunction<I1, I2, I3, I4, I5, O> extends Serializable, QuintFunction<I1, I2, I3, I4, I5, O> {}
+
+    default IIsolatedExecutor with(ExecutorService executor) { throw new UnsupportedOperationException(); }
+    default Executor executor() { throw new UnsupportedOperationException(); }
 
     Future<Void> shutdown();
 
@@ -96,6 +114,16 @@
     <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer);
 
     /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I1, I2, I3> TriFunction<I1, I2, I3, Future<?>> async(TriConsumer<I1, I2, I3> consumer);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I1, I2, I3> TriConsumer<I1, I2, I3> sync(TriConsumer<I1, I2, I3> consumer);
+
+    /**
      * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
      */
     <I, O> Function<I, Future<O>> async(Function<I, O> f);
@@ -124,4 +152,24 @@
      * Convert the execution to one performed synchronously on the IsolatedExecutor
      */
     <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, Future<O>> async(QuadFunction<I1, I2, I3, I4, O> f);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, O> sync(QuadFunction<I1, I2, I3, I4, O> f);
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, Future<O>> async(QuintFunction<I1, I2, I3, I4, I5, O> f);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, O> sync(QuintFunction<I1, I2, I3, I4, I5, O> f);
 }
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
index df4ef36..2d4ea69 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -25,7 +25,6 @@
     interface Filter
     {
         Filter off();
-
         Filter on();
     }
 
@@ -94,6 +93,9 @@
 
     void reset();
 
+    default boolean hasInbound() { return true; }
+    default boolean hasOutbound() { return true; }
+
     /**
      * Checks if the message should be delivered.  This is expected to run on "inbound", or on the reciever of
      * the message (instance.config.num == to).
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageSink.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageSink.java
new file mode 100644
index 0000000..c9661e5
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageSink.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.distributed.api;
+
+import java.net.InetSocketAddress;
+
+public interface IMessageSink {
+
+    void accept(InetSocketAddress to, IMessage message);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
index 95e7e5d..d3c3494 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
@@ -18,14 +18,13 @@
 
 package org.apache.cassandra.distributed.shared;
 
-import org.apache.cassandra.distributed.api.ICluster;
-import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.api.*;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,7 +32,9 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -51,12 +52,15 @@
     private int subnet;
     private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
     private TokenSupplier tokenSupplier;
-    private File root;
+    private Path rootPath;
+    private File rootFile;
     private Versions.Version version;
     private Consumer<IInstanceConfig> configUpdater;
     private ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
+    private Predicate<String> sharedClasses = InstanceClassLoader.getDefaultLoadSharedFilter();
     private int broadcastPort = 7012;
-    private BiConsumer<ClassLoader, Integer> instanceInitializer = (cl, id) -> {};
+    private IInstanceInitializer instanceInitializer = (cl, tg, num, gen) -> {};
+    private IClassTransformer classTransformer;
     private int datadirCount = 3;
     private final List<Rack> racks = new ArrayList<>();
     private boolean finalised;
@@ -83,7 +87,11 @@
     }
 
     public File getRoot() {
-        return root;
+        return rootFile != null ? rootFile : rootPath.toFile();
+    }
+
+    public Path getRootPath() {
+        return rootPath != null ? rootPath : rootFile.toPath();
     }
 
     public Versions.Version getVersion() {
@@ -98,15 +106,30 @@
         return sharedClassLoader;
     }
 
+    public Predicate<String> getSharedClasses() {
+        return sharedClasses;
+    }
+
     public int getBroadcastPort() {
         return broadcastPort;
     }
 
+    @Deprecated
     public BiConsumer<ClassLoader, Integer> getInstanceInitializer()
     {
+        return instanceInitializer::initialise;
+    }
+
+    public IInstanceInitializer getInstanceInitializer2()
+    {
         return instanceInitializer;
     }
 
+    public IClassTransformer getClassTransformer()
+    {
+        return classTransformer;
+    }
+
     public int getDatadirCount()
     {
         return datadirCount;
@@ -122,10 +145,11 @@
     public C createWithoutStarting() throws IOException
     {
         finaliseBuilder();
-        if (root == null)
-            root = Files.createTempDirectory("dtests").toFile();
+        if (rootFile == null && rootPath == null)
+            rootPath = Files.createTempDirectory("dtests");
 
-        root.mkdirs();
+        if (rootFile != null) rootFile.mkdirs();
+        else try { Files.createDirectories(rootPath); } catch (FileAlreadyExistsException ignore) { }
 
         // TODO: make token allocation strategy configurable
         if (tokenSupplier == null)
@@ -140,6 +164,12 @@
         return (B) this;
     }
 
+    public B withSharedClasses(Predicate<String> sharedClasses)
+    {
+        this.sharedClasses = Objects.requireNonNull(sharedClasses, "sharedClasses");
+        return (B) this;
+    }
+
     public B withBroadcastPort(int broadcastPort) {
         this.broadcastPort = broadcastPort;
         return (B) this;
@@ -244,7 +274,13 @@
 
     public B withRoot(File root)
     {
-        this.root = root;
+        this.rootFile = root;
+        return (B) this;
+    }
+
+    public B withRoot(Path root)
+    {
+        this.rootPath = root;
         return (B) this;
     }
 
@@ -260,12 +296,42 @@
         return (B) this;
     }
 
+    public B appendConfig(Consumer<IInstanceConfig> updater)
+    {
+        Consumer<IInstanceConfig> prev = configUpdater;
+        Consumer<IInstanceConfig> next = prev == null ? updater : config -> { prev.accept(config); updater.accept(config); };
+        this.configUpdater = next;
+        return (B) this;
+    }
+
     public B withInstanceInitializer(BiConsumer<ClassLoader, Integer> instanceInitializer)
     {
+        this.instanceInitializer = new IInstanceInitializer() {
+            @Override
+            public void initialise(ClassLoader classLoader, ThreadGroup threadGroup, int num, int generation) {
+                instanceInitializer.accept(classLoader, num);
+            }
+
+            @Override
+            public void initialise(ClassLoader classLoader, int num) {
+                instanceInitializer.accept(classLoader, num);
+            }
+        };
+        return (B) this;
+    }
+
+    public B withInstanceInitializer(IInstanceInitializer instanceInitializer)
+    {
         this.instanceInitializer = instanceInitializer;
         return (B) this;
     }
 
+    public B withClassTransformer(IClassTransformer classTransformer)
+    {
+        this.classTransformer = classTransformer;
+        return (B) this;
+    }
+
     public B withDataDirCount(int datadirCount)
     {
         assert datadirCount > 0 : "data dir count requires a positive number but given " + datadirCount;
@@ -279,6 +345,7 @@
             return;
         finalised = true;
 
+        boolean log = logTopology();
         if (!racks.isEmpty())
         {
             setRacks();
@@ -287,7 +354,7 @@
         {
             if (nodeIdTopology.size() < nodeCount)
             {
-                System.out.println("Adjusting node count since nodeIdTopology contains fewer nodes");
+                if (log) System.out.println("Adjusting node count since nodeIdTopology contains fewer nodes");
                 nodeCount = nodeIdTopology.size();
             }
             else if (nodeIdTopology.size() > nodeCount)
@@ -295,7 +362,7 @@
                 if (nodeCount == 0)
                     nodeCount = nodeIdTopology.size();
                 else
-                    System.out.printf("nodeIdTopology configured for %d nodes while nodeCount is %d%n", nodeIdTopology.size(), nodeCount);
+                if (log) System.out.printf("nodeIdTopology configured for %d nodes while nodeCount is %d%n", nodeIdTopology.size(), nodeCount);
             }
         }
         else
@@ -308,13 +375,13 @@
         if (nodeCount <= 0)
             throw new IllegalStateException("Cluster must have at least one node");
 
-        System.out.println("Node id topology:");
+        if (log) System.out.println("Node id topology:");
         for (int i = 1; i <= nodeIdTopology.size(); i++)
         {
             NetworkTopology.DcAndRack dcAndRack = nodeIdTopology.get(i);
-            System.out.printf("node %d: dc = %s, rack = %s%n", i, dcAndRack.dc, dcAndRack.rack);
+            if (log) System.out.printf("node %d: dc = %s, rack = %s%n", i, dcAndRack.dc, dcAndRack.rack);
         }
-        System.out.printf("Configured node count: %d, nodeIdTopology size: %d%n", nodeCount, nodeIdTopology.size());
+        if (log) System.out.printf("Configured node count: %d, nodeIdTopology size: %d%n", nodeCount, nodeIdTopology.size());
     }
 
     private void setRacks()
@@ -359,7 +426,7 @@
             assert nodeIdTopology.size() > nodeCount : "withRacks should only ever increase the node count";
             if (nodeCount == 0)
                 nodeCount =  nodeIdTopology.size();
-            else
+            else if (logTopology())
                 System.out.printf("Network topology of %s requires more nodes, only starting %s out of %s configured nodes%n", nodeIdTopology, nodeCount, nodeIdTopology.size());
         }
     }
@@ -387,6 +454,11 @@
             this.rackNodeCount = rackNodeCount;
         }
     }
+
+    private static boolean logTopology()
+    {
+        return !System.getProperty("cassandra.dtest.api.log.topology", "").equals("false");
+    }
 }
 
 
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
index c71b75f..7420f5e 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
@@ -18,11 +18,20 @@
 
 package org.apache.cassandra.distributed.shared;
 
+import org.apache.cassandra.distributed.api.IClassTransformer;
+
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.JarURLConnection;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.net.URLConnection;
+import java.security.CodeSigner;
+import java.security.CodeSource;
 import java.util.Arrays;
+import java.util.function.BiFunction;
 import java.util.function.Predicate;
+import java.util.jar.Manifest;
 
 public class InstanceClassLoader extends URLClassLoader
 {
@@ -48,20 +57,32 @@
     private final int id;
     private final ClassLoader sharedClassLoader;
     private final Predicate<String> loadShared;
+    private final IClassTransformer transform;
 
     public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
     {
         this(generation, id, urls, sharedClassLoader, DEFAULT_SHARED_PACKAGES);
     }
 
+    public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader, IClassTransformer transform)
+    {
+        this(generation, id, urls, sharedClassLoader, DEFAULT_SHARED_PACKAGES, transform);
+    }
+
     public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader, Predicate<String> loadShared)
     {
+        this(generation, id, urls, sharedClassLoader, loadShared, null);
+    }
+
+    public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader, Predicate<String> loadShared, IClassTransformer transform)
+    {
         super(urls, null);
         this.urls = urls;
         this.sharedClassLoader = sharedClassLoader;
         this.generation = generation;
         this.id = id;
         this.loadShared = loadShared == null ? DEFAULT_SHARED_PACKAGES : loadShared;
+        this.transform = transform;
     }
 
     public static Predicate<String> getDefaultLoadSharedFilter()
@@ -113,6 +134,76 @@
         return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
     }
 
+    protected Class<?> findClass(String name) throws ClassNotFoundException
+    {
+        if (transform == null)
+            return super.findClass(name);
+
+        String pkg = name.substring(0, name.lastIndexOf('.'));
+        String path = name.replace('.', '/').concat(".class");
+        URL url = getResource(path);
+        if (url == null)
+        {
+            byte[] bytes = transform.transform(name, null);
+            if (bytes == null)
+                throw new ClassNotFoundException(name);
+            if (null == getPackage(pkg))
+                definePackage(pkg, null, null, null, null, null, null, null);
+            return defineClass(name, bytes, 0, bytes.length);
+        }
+        try
+        {
+            URLConnection connection = url.openConnection();
+            CodeSigner[] codeSigners = null;
+            Manifest manifest = null;
+            if (connection instanceof JarURLConnection)
+            {
+                manifest = ((JarURLConnection) connection).getManifest();
+                codeSigners = ((JarURLConnection) connection).getJarEntry().getCodeSigners();
+            }
+            if (null == getPackage(pkg))
+            {
+                try
+                {
+                    if (manifest != null) definePackage(pkg, manifest, url);
+                    else definePackage(pkg, null, null, null, null, null, null, null);
+                }
+                catch (IllegalArgumentException iae)
+                {
+                    if (null == getPackage(pkg))
+                        throw iae;
+                }
+            }
+            try (InputStream in = connection.getInputStream())
+            {
+                byte[] bytes = new byte[in.available()];
+                int cur, total = 0;
+                while (0 <= (cur = in.read(bytes, total, bytes.length - total)))
+                {
+                    total += cur;
+                    if (cur == 0 && total == bytes.length)
+                    {
+                        int next = in.read();
+                        if (next < 0)
+                            break;
+
+                        bytes = Arrays.copyOf(bytes, bytes.length * 2);
+                        bytes[total++] = (byte)next;
+                    }
+                }
+                if (total < bytes.length)
+                    bytes = Arrays.copyOf(bytes, total);
+
+                bytes = transform.transform(name, bytes);
+                return defineClass(name, bytes, 0, bytes.length, new CodeSource(url, codeSigners));
+            }
+        }
+        catch (Throwable t)
+        {
+            throw new ClassNotFoundException(name, t);
+        }
+    }
+
     public String toString()
     {
         return "InstanceClassLoader{" +