CASSANDRA-17013: CEP-10 Simulator Improvements
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IClassTransformer.java b/src/main/java/org/apache/cassandra/distributed/api/IClassTransformer.java
new file mode 100644
index 0000000..fe718cd
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IClassTransformer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.distributed.api;
+
+public interface IClassTransformer {
+
+ /**
+ * Modify the bytecode of the provided class. Provides the original bytecode and the fully qualified name of the class.
+ * Note, bytecode may be null indicating the class definition could not be found. In this case a synthetic definition
+ * may be returned, or null.
+ */
+ byte[] transform(String name, byte[] bytecode);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
index 3fc056e..4af4ae5 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -56,6 +56,15 @@
IMessageFilters filters();
+ default void setMessageSink(IMessageSink messageSink) { throw new UnsupportedOperationException(); }
+
+ default void deliverMessage(InetSocketAddress to, IMessage msg)
+ {
+ IInstance toInstance = get(to);
+ if (toInstance != null)
+ toInstance.receiveMessage(msg);
+ }
+
/**
* dynamically sets the current uncaught exceptions filter
*
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
index 8ac6235..2ff0842 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -21,6 +21,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import org.apache.cassandra.distributed.shared.Metrics;
@@ -83,9 +84,12 @@
// these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
void startup(ICluster cluster);
+ default void postStartup() {}
void receiveMessage(IMessage message);
+ void receiveMessageWithInvokingThread(IMessage message);
+
int getMessagingVersion();
void setMessagingVersion(InetSocketAddress addressAndPort, int version);
@@ -96,6 +100,8 @@
void forceCompact(String keyspace, String table);
+ default Executor executorFor(int verb) { throw new UnsupportedOperationException(); }
+
default boolean getLogsEnabled()
{
try
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
index ba1a09b..f88e761 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -61,6 +61,8 @@
IInstanceConfig set(String fieldName, Object value);
+ default IInstanceConfig forceSet(String fieldName, Object value) { throw new UnsupportedOperationException(); }
+
Object get(String fieldName);
String getString(String fieldName);
@@ -69,7 +71,9 @@
boolean has(Feature featureFlag);
- public IInstanceConfig forVersion(Semver series);
+ IInstanceConfig forVersion(Semver series);
+
+ Map<String, Object> getParams();
public static class ParameterizedClass
{
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceInitializer.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceInitializer.java
new file mode 100644
index 0000000..5acc067
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceInitializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface IInstanceInitializer
+{
+ default void initialise(ClassLoader classLoader, int num) { throw new UnsupportedOperationException(); }
+ void initialise(ClassLoader classLoader, ThreadGroup threadGroup, int num, int generation);
+ default void beforeStartup(IInstance instance) {}
+ default void afterStartup(IInstance instance) {}
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
index dff5a62..29b0ea2 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
@@ -47,19 +47,115 @@
default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
+ default <I1> void acceptOnInstance(SerializableConsumer<I1> consumer, I1 i1) { acceptsOnInstance(consumer).accept(i1); }
default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
+ default <I1, I2> void acceptOnInstance(SerializableBiConsumer<I1, I2> consumer, I1 i1, I2 i2) { acceptsOnInstance(consumer).accept(i1, i2); }
+
+ default <I1, I2, I3> TriFunction<I1, I2, I3, Future<?>> asyncAcceptsOnInstance(SerializableTriConsumer<I1, I2, I3> consumer) { return async(transfer(consumer)); }
+ default <I1, I2, I3> TriConsumer<I1, I2, I3> acceptsOnInstance(SerializableTriConsumer<I1, I2, I3> consumer) { return sync(transfer(consumer)); }
+ default <I1, I2, I3> void acceptOnInstance(SerializableTriConsumer<I1, I2, I3> consumer, I1 i1, I2 i2, I3 i3) { acceptsOnInstance(consumer).accept(i1, i2, i3); }
default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
+ default <I1, O> O applyOnInstance(SerializableFunction<I1, O> f, I1 i1) { return sync(transfer(f)).apply(i1); }
default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
+ default <I1, I2, O> O applyOnInstance(SerializableBiFunction<I1, I2, O> f, I1 i1, I2 i2) { return sync(transfer(f)).apply(i1, i2); }
default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
+ default <I1, I2, I3, O> O applyOnInstance(SerializableTriFunction<I1, I2, I3, O> f, I1 i1, I2 i2, I3 i3) { return sync(transfer(f)).apply(i1, i2, i3); }
+
+ default <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, Future<O>> asyncAppliesOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f) { return async(transfer(f)); }
+ default <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, O> appliesOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f) { return sync(transfer(f)); }
+ default <I1, I2, I3, I4, O> O applyOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f, I1 i1, I2 i2, I3 i3, I4 i4) { return sync(transfer(f)).apply(i1, i2, i3, i4); }
+
+ default <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, Future<O>> asyncAppliesOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f) { return async(transfer(f)); }
+ default <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, O> appliesOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f) { return sync(transfer(f)); }
+ default <I1, I2, I3, I4, I5, O> O applyOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f, I1 i1, I2 i2, I3 i3, I4 i4, I5 i5) { return sync(transfer(f)).apply(i1, i2, i3, i4, i5); }
+
+ /**
+ * {@link #runOnInstance(SerializableRunnable)} on the invoking thread
+ */
+ default void unsafeRunOnThisThread(IIsolatedExecutor.SerializableRunnable invoke)
+ {
+ transfer(invoke).run();
+ }
+
+ /**
+ * {@link #callOnInstance(SerializableCallable)} on the invoking thread
+ */
+ default <O> O unsafeCallOnThisThread(IIsolatedExecutor.SerializableCallable<O> invoke)
+ {
+ return transfer(invoke).call();
+ }
+
+ /**
+ * {@link #acceptOnInstance(SerializableConsumer, Object)} on the invoking thread
+ */
+ default <I> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableConsumer<I> apply, I i1)
+ {
+ transfer(apply).accept(i1);
+ }
+
+ /**
+ * {@link #acceptOnInstance(SerializableBiConsumer, Object, Object)} on the invoking thread
+ */
+ default <I1, I2> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableBiConsumer<I1, I2> apply, I1 i1, I2 i2)
+ {
+ transfer(apply).accept(i1, i2);
+ }
+
+ /**
+ * {@link #acceptOnInstance(SerializableBiConsumer, Object, Object)} on the invoking thread
+ */
+ default <I1, I2, I3> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableTriConsumer<I1, I2, I3> apply, I1 i1, I2 i2, I3 i3)
+ {
+ transfer(apply).accept(i1, i2, i3);
+ }
+
+ /**
+ * {@link #applyOnInstance(SerializableFunction, Object)} on the invoking thread
+ */
+ default <I, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableFunction<I, O> apply, I i1)
+ {
+ return transfer(apply).apply(i1);
+ }
+
+ /**
+ * {@link #applyOnInstance(SerializableBiFunction, Object, Object)} on the invoking thread
+ */
+ default <I1, I2, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableBiFunction<I1, I2, O> apply, I1 i1, I2 i2)
+ {
+ return transfer(apply).apply(i1, i2);
+ }
+
+ /**
+ * {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
+ */
+ default <I1, I2, I3, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableTriFunction<I1, I2, I3, O> apply, I1 i1, I2 i2, I3 i3)
+ {
+ return transfer(apply).apply(i1, i2, i3);
+ }
+
+ /**
+ * {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
+ */
+ default <I1, I2, I3, I4, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableQuadFunction<I1, I2, I3, I4, O> apply, I1 i1, I2 i2, I3 i3, I4 i4)
+ {
+ return transfer(apply).apply(i1, i2, i3, i4);
+ }
+
+ /**
+ * {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
+ */
+ default <I1, I2, I3, I4, I5, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableQuintFunction<I1, I2, I3, I4, I5, O> apply, I1 i1, I2 i2, I3 i3, I4 i4, I5 i5)
+ {
+ return transfer(apply).apply(i1, i2, i3, i4, i5);
+ }
<E extends Serializable> E transfer(E object);
-
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
index c8a66e4..b49ea94 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
@@ -20,6 +20,8 @@
import java.io.Serializable;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
@@ -41,17 +43,33 @@
{
interface CallableNoExcept<O> extends Callable<O> { O call(); }
- interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable {}
interface SerializableRunnable extends Runnable, Serializable {}
- interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
+
interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
+ interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable {}
+
+ interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
+ interface TriConsumer<I1, I2, I3> { void accept(I1 i1, I2 i2, I3 i3); }
+ interface SerializableTriConsumer<I1, I2, I3> extends TriConsumer<I1, I2, I3>, Serializable { }
+
+ interface DynamicFunction<IO>
+ {
+ <IO2 extends IO> IO2 apply(IO2 i);
+ }
+ interface SerializableDynamicFunction<IO> extends DynamicFunction<IO>, Serializable {}
+
interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
-
interface TriFunction<I1, I2, I3, O> { O apply(I1 i1, I2 i2, I3 i3); }
-
interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> {}
+ interface QuadFunction<I1, I2, I3, I4, O> { O apply(I1 i1, I2 i2, I3 i3, I4 i4); }
+ interface SerializableQuadFunction<I1, I2, I3, I4, O> extends Serializable, QuadFunction<I1, I2, I3, I4, O> {}
+ interface QuintFunction<I1, I2, I3, I4, I5, O> { O apply(I1 i1, I2 i2, I3 i3, I4 i4, I5 i5); }
+ interface SerializableQuintFunction<I1, I2, I3, I4, I5, O> extends Serializable, QuintFunction<I1, I2, I3, I4, I5, O> {}
+
+ default IIsolatedExecutor with(ExecutorService executor) { throw new UnsupportedOperationException(); }
+ default Executor executor() { throw new UnsupportedOperationException(); }
Future<Void> shutdown();
@@ -96,6 +114,16 @@
<I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer);
/**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2, I3> TriFunction<I1, I2, I3, Future<?>> async(TriConsumer<I1, I2, I3> consumer);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2, I3> TriConsumer<I1, I2, I3> sync(TriConsumer<I1, I2, I3> consumer);
+
+ /**
* Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
*/
<I, O> Function<I, Future<O>> async(Function<I, O> f);
@@ -124,4 +152,24 @@
* Convert the execution to one performed synchronously on the IsolatedExecutor
*/
<I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, Future<O>> async(QuadFunction<I1, I2, I3, I4, O> f);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, O> sync(QuadFunction<I1, I2, I3, I4, O> f);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, Future<O>> async(QuintFunction<I1, I2, I3, I4, I5, O> f);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, O> sync(QuintFunction<I1, I2, I3, I4, I5, O> f);
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
index df4ef36..2d4ea69 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -25,7 +25,6 @@
interface Filter
{
Filter off();
-
Filter on();
}
@@ -94,6 +93,9 @@
void reset();
+ default boolean hasInbound() { return true; }
+ default boolean hasOutbound() { return true; }
+
/**
* Checks if the message should be delivered. This is expected to run on "inbound", or on the reciever of
* the message (instance.config.num == to).
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageSink.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageSink.java
new file mode 100644
index 0000000..c9661e5
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageSink.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.distributed.api;
+
+import java.net.InetSocketAddress;
+
+public interface IMessageSink {
+
+ void accept(InetSocketAddress to, IMessage message);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
index 95e7e5d..d3c3494 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
@@ -18,14 +18,13 @@
package org.apache.cassandra.distributed.shared;
-import org.apache.cassandra.distributed.api.ICluster;
-import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.api.*;
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -33,7 +32,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -51,12 +52,15 @@
private int subnet;
private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
private TokenSupplier tokenSupplier;
- private File root;
+ private Path rootPath;
+ private File rootFile;
private Versions.Version version;
private Consumer<IInstanceConfig> configUpdater;
private ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
+ private Predicate<String> sharedClasses = InstanceClassLoader.getDefaultLoadSharedFilter();
private int broadcastPort = 7012;
- private BiConsumer<ClassLoader, Integer> instanceInitializer = (cl, id) -> {};
+ private IInstanceInitializer instanceInitializer = (cl, tg, num, gen) -> {};
+ private IClassTransformer classTransformer;
private int datadirCount = 3;
private final List<Rack> racks = new ArrayList<>();
private boolean finalised;
@@ -83,7 +87,11 @@
}
public File getRoot() {
- return root;
+ return rootFile != null ? rootFile : rootPath.toFile();
+ }
+
+ public Path getRootPath() {
+ return rootPath != null ? rootPath : rootFile.toPath();
}
public Versions.Version getVersion() {
@@ -98,15 +106,30 @@
return sharedClassLoader;
}
+ public Predicate<String> getSharedClasses() {
+ return sharedClasses;
+ }
+
public int getBroadcastPort() {
return broadcastPort;
}
+ @Deprecated
public BiConsumer<ClassLoader, Integer> getInstanceInitializer()
{
+ return instanceInitializer::initialise;
+ }
+
+ public IInstanceInitializer getInstanceInitializer2()
+ {
return instanceInitializer;
}
+ public IClassTransformer getClassTransformer()
+ {
+ return classTransformer;
+ }
+
public int getDatadirCount()
{
return datadirCount;
@@ -122,10 +145,11 @@
public C createWithoutStarting() throws IOException
{
finaliseBuilder();
- if (root == null)
- root = Files.createTempDirectory("dtests").toFile();
+ if (rootFile == null && rootPath == null)
+ rootPath = Files.createTempDirectory("dtests");
- root.mkdirs();
+ if (rootFile != null) rootFile.mkdirs();
+ else try { Files.createDirectories(rootPath); } catch (FileAlreadyExistsException ignore) { }
// TODO: make token allocation strategy configurable
if (tokenSupplier == null)
@@ -140,6 +164,12 @@
return (B) this;
}
+ public B withSharedClasses(Predicate<String> sharedClasses)
+ {
+ this.sharedClasses = Objects.requireNonNull(sharedClasses, "sharedClasses");
+ return (B) this;
+ }
+
public B withBroadcastPort(int broadcastPort) {
this.broadcastPort = broadcastPort;
return (B) this;
@@ -244,7 +274,13 @@
public B withRoot(File root)
{
- this.root = root;
+ this.rootFile = root;
+ return (B) this;
+ }
+
+ public B withRoot(Path root)
+ {
+ this.rootPath = root;
return (B) this;
}
@@ -260,12 +296,42 @@
return (B) this;
}
+ public B appendConfig(Consumer<IInstanceConfig> updater)
+ {
+ Consumer<IInstanceConfig> prev = configUpdater;
+ Consumer<IInstanceConfig> next = prev == null ? updater : config -> { prev.accept(config); updater.accept(config); };
+ this.configUpdater = next;
+ return (B) this;
+ }
+
public B withInstanceInitializer(BiConsumer<ClassLoader, Integer> instanceInitializer)
{
+ this.instanceInitializer = new IInstanceInitializer() {
+ @Override
+ public void initialise(ClassLoader classLoader, ThreadGroup threadGroup, int num, int generation) {
+ instanceInitializer.accept(classLoader, num);
+ }
+
+ @Override
+ public void initialise(ClassLoader classLoader, int num) {
+ instanceInitializer.accept(classLoader, num);
+ }
+ };
+ return (B) this;
+ }
+
+ public B withInstanceInitializer(IInstanceInitializer instanceInitializer)
+ {
this.instanceInitializer = instanceInitializer;
return (B) this;
}
+ public B withClassTransformer(IClassTransformer classTransformer)
+ {
+ this.classTransformer = classTransformer;
+ return (B) this;
+ }
+
public B withDataDirCount(int datadirCount)
{
assert datadirCount > 0 : "data dir count requires a positive number but given " + datadirCount;
@@ -279,6 +345,7 @@
return;
finalised = true;
+ boolean log = logTopology();
if (!racks.isEmpty())
{
setRacks();
@@ -287,7 +354,7 @@
{
if (nodeIdTopology.size() < nodeCount)
{
- System.out.println("Adjusting node count since nodeIdTopology contains fewer nodes");
+ if (log) System.out.println("Adjusting node count since nodeIdTopology contains fewer nodes");
nodeCount = nodeIdTopology.size();
}
else if (nodeIdTopology.size() > nodeCount)
@@ -295,7 +362,7 @@
if (nodeCount == 0)
nodeCount = nodeIdTopology.size();
else
- System.out.printf("nodeIdTopology configured for %d nodes while nodeCount is %d%n", nodeIdTopology.size(), nodeCount);
+ if (log) System.out.printf("nodeIdTopology configured for %d nodes while nodeCount is %d%n", nodeIdTopology.size(), nodeCount);
}
}
else
@@ -308,13 +375,13 @@
if (nodeCount <= 0)
throw new IllegalStateException("Cluster must have at least one node");
- System.out.println("Node id topology:");
+ if (log) System.out.println("Node id topology:");
for (int i = 1; i <= nodeIdTopology.size(); i++)
{
NetworkTopology.DcAndRack dcAndRack = nodeIdTopology.get(i);
- System.out.printf("node %d: dc = %s, rack = %s%n", i, dcAndRack.dc, dcAndRack.rack);
+ if (log) System.out.printf("node %d: dc = %s, rack = %s%n", i, dcAndRack.dc, dcAndRack.rack);
}
- System.out.printf("Configured node count: %d, nodeIdTopology size: %d%n", nodeCount, nodeIdTopology.size());
+ if (log) System.out.printf("Configured node count: %d, nodeIdTopology size: %d%n", nodeCount, nodeIdTopology.size());
}
private void setRacks()
@@ -359,7 +426,7 @@
assert nodeIdTopology.size() > nodeCount : "withRacks should only ever increase the node count";
if (nodeCount == 0)
nodeCount = nodeIdTopology.size();
- else
+ else if (logTopology())
System.out.printf("Network topology of %s requires more nodes, only starting %s out of %s configured nodes%n", nodeIdTopology, nodeCount, nodeIdTopology.size());
}
}
@@ -387,6 +454,11 @@
this.rackNodeCount = rackNodeCount;
}
}
+
+ private static boolean logTopology()
+ {
+ return !System.getProperty("cassandra.dtest.api.log.topology", "").equals("false");
+ }
}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
index c71b75f..7420f5e 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
@@ -18,11 +18,20 @@
package org.apache.cassandra.distributed.shared;
+import org.apache.cassandra.distributed.api.IClassTransformer;
+
import java.io.IOException;
+import java.io.InputStream;
+import java.net.JarURLConnection;
import java.net.URL;
import java.net.URLClassLoader;
+import java.net.URLConnection;
+import java.security.CodeSigner;
+import java.security.CodeSource;
import java.util.Arrays;
+import java.util.function.BiFunction;
import java.util.function.Predicate;
+import java.util.jar.Manifest;
public class InstanceClassLoader extends URLClassLoader
{
@@ -48,20 +57,32 @@
private final int id;
private final ClassLoader sharedClassLoader;
private final Predicate<String> loadShared;
+ private final IClassTransformer transform;
public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
{
this(generation, id, urls, sharedClassLoader, DEFAULT_SHARED_PACKAGES);
}
+ public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader, IClassTransformer transform)
+ {
+ this(generation, id, urls, sharedClassLoader, DEFAULT_SHARED_PACKAGES, transform);
+ }
+
public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader, Predicate<String> loadShared)
{
+ this(generation, id, urls, sharedClassLoader, loadShared, null);
+ }
+
+ public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader, Predicate<String> loadShared, IClassTransformer transform)
+ {
super(urls, null);
this.urls = urls;
this.sharedClassLoader = sharedClassLoader;
this.generation = generation;
this.id = id;
this.loadShared = loadShared == null ? DEFAULT_SHARED_PACKAGES : loadShared;
+ this.transform = transform;
}
public static Predicate<String> getDefaultLoadSharedFilter()
@@ -113,6 +134,76 @@
return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
}
+ protected Class<?> findClass(String name) throws ClassNotFoundException
+ {
+ if (transform == null)
+ return super.findClass(name);
+
+ String pkg = name.substring(0, name.lastIndexOf('.'));
+ String path = name.replace('.', '/').concat(".class");
+ URL url = getResource(path);
+ if (url == null)
+ {
+ byte[] bytes = transform.transform(name, null);
+ if (bytes == null)
+ throw new ClassNotFoundException(name);
+ if (null == getPackage(pkg))
+ definePackage(pkg, null, null, null, null, null, null, null);
+ return defineClass(name, bytes, 0, bytes.length);
+ }
+ try
+ {
+ URLConnection connection = url.openConnection();
+ CodeSigner[] codeSigners = null;
+ Manifest manifest = null;
+ if (connection instanceof JarURLConnection)
+ {
+ manifest = ((JarURLConnection) connection).getManifest();
+ codeSigners = ((JarURLConnection) connection).getJarEntry().getCodeSigners();
+ }
+ if (null == getPackage(pkg))
+ {
+ try
+ {
+ if (manifest != null) definePackage(pkg, manifest, url);
+ else definePackage(pkg, null, null, null, null, null, null, null);
+ }
+ catch (IllegalArgumentException iae)
+ {
+ if (null == getPackage(pkg))
+ throw iae;
+ }
+ }
+ try (InputStream in = connection.getInputStream())
+ {
+ byte[] bytes = new byte[in.available()];
+ int cur, total = 0;
+ while (0 <= (cur = in.read(bytes, total, bytes.length - total)))
+ {
+ total += cur;
+ if (cur == 0 && total == bytes.length)
+ {
+ int next = in.read();
+ if (next < 0)
+ break;
+
+ bytes = Arrays.copyOf(bytes, bytes.length * 2);
+ bytes[total++] = (byte)next;
+ }
+ }
+ if (total < bytes.length)
+ bytes = Arrays.copyOf(bytes, total);
+
+ bytes = transform.transform(name, bytes);
+ return defineClass(name, bytes, 0, bytes.length, new CodeSource(url, codeSigners));
+ }
+ }
+ catch (Throwable t)
+ {
+ throw new ClassNotFoundException(name, t);
+ }
+ }
+
public String toString()
{
return "InstanceClassLoader{" +