Reformat code according to Apache Cassandra styleguide
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
index 3c057f8..43c0f38 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
@@ -18,7 +18,8 @@
package org.apache.cassandra.distributed.api;
-public enum ConsistencyLevel {
+public enum ConsistencyLevel
+{
ANY,
ONE,
TWO,
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
index dffd980..6546d95 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -26,7 +26,8 @@
import java.nio.file.Paths;
import java.util.stream.Stream;
-public interface ICluster<I extends IInstance> extends AutoCloseable {
+public interface ICluster<I extends IInstance> extends AutoCloseable
+{
void startup();
I bootstrap(IInstanceConfig config);
@@ -51,14 +52,16 @@
IMessageFilters filters();
- static void setup() throws Throwable {
+ static void setup() throws Throwable
+ {
setupLogging();
setSystemProperties();
nativeLibraryWorkaround();
processReaperWorkaround();
}
- static void nativeLibraryWorkaround() {
+ static void nativeLibraryWorkaround()
+ {
// Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
// CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
// SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
@@ -66,32 +69,39 @@
System.setProperty("io.netty.transport.noNative", "true");
}
- static void processReaperWorkaround() throws Throwable {
+ static void processReaperWorkaround() throws Throwable
+ {
// Make sure the 'process reaper' thread is initially created under the main classloader,
// otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
// which prevents it from being garbage collected.
new ProcessBuilder().command("true").start().waitFor();
}
- static void setSystemProperties() {
+ static void setSystemProperties()
+ {
System.setProperty("cassandra.ring_delay_ms", Integer.toString(30 * 1000));
System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
}
- static void setupLogging() {
- try {
+ static void setupLogging()
+ {
+ try
+ {
File root = Files.createTempDirectory("in-jvm-dtest").toFile();
root.deleteOnExit();
String testConfPath = "test/conf/logback-dtest.xml";
Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
- if (!logConfPath.toFile().exists()) {
+ if (!logConfPath.toFile().exists())
+ {
Files.copy(new File(testConfPath).toPath(),
logConfPath);
}
System.setProperty("logback.configurationFile", "file://" + logConfPath);
- } catch (IOException e) {
+ }
+ catch (IOException e)
+ {
throw new RuntimeException(e);
}
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
index 3d07a3d..34087d0 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -29,10 +29,14 @@
{
return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays();
}
+
QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+
Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);
Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+
Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+
IInstance instance();
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
index 0dd4865..90c8242 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -26,35 +26,46 @@
public interface IInstance extends IIsolatedExecutor
{
ICoordinator coordinator();
+
IListen listen();
void schemaChangeInternal(String query);
+
public Object[][] executeInternal(String query, Object... args);
IInstanceConfig config();
+
InetSocketAddress broadcastAddress();
+
UUID schemaVersion();
void startup();
+
boolean isShutdown();
+
Future<Void> shutdown();
+
Future<Void> shutdown(boolean graceful);
int liveMemberCount();
NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs);
+
default NodeToolResult nodetoolResult(String... commandAndArgs)
{
return nodetoolResult(true, commandAndArgs);
}
- default int nodetool(String... commandAndArgs) {
+
+ default int nodetool(String... commandAndArgs)
+ {
return nodetoolResult(commandAndArgs).getRc();
}
+
void uncaughtException(Thread t, Throwable e);
/**
* Return the number of times the instance tried to call {@link System#exit(int)}.
- *
+ * <p>
* When the instance is shutdown, this state should be saved, but in case not possible should return {@code -1}
* to indicate "unknown".
*/
@@ -62,11 +73,14 @@
// these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
void startup(ICluster cluster);
+
void receiveMessage(IMessage message);
int getMessagingVersion();
+
void setMessagingVersion(InetSocketAddress addressAndPort, int version);
void flush(String keyspace);
+
void forceCompact(String keyspace, String table);
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
index 97c1534..7324816 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -31,14 +31,19 @@
public interface IInstanceConfig
{
IInstanceConfig with(Feature featureFlag);
+
IInstanceConfig with(Feature... flags);
int num();
+
UUID hostId();
+
InetSocketAddress broadcastAddress();
+
NetworkTopology networkTopology();
String localRack();
+
String localDatacenter();
/**
@@ -46,16 +51,21 @@
* from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but
* must use the reflection API to modify the state
*/
- void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>>executor);
+ void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>> executor);
/**
* Validates whether the config properties are within range of accepted values.
*/
void validate();
+
IInstanceConfig set(String fieldName, Object value);
+
Object get(String fieldName);
+
String getString(String fieldName);
+
int getInt(String fieldName);
+
boolean has(Feature featureFlag);
public IInstanceConfig forVersion(Versions.Major major);
@@ -77,8 +87,8 @@
@SuppressWarnings("unchecked")
public ParameterizedClass(Map<String, ?> p)
{
- this((String)p.get(CLASS_NAME),
- p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
+ this((String) p.get(CLASS_NAME),
+ p.containsKey(PARAMETERS) ? (Map<String, String>) ((List<?>) p.get(PARAMETERS)).get(0) : null);
}
@Override
@@ -98,5 +108,4 @@
return class_name + (parameters == null ? "" : parameters.toString());
}
}
-
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
index 1dbc4cc..dff5a62 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
@@ -25,43 +25,41 @@
import java.util.function.Consumer;
import java.util.function.Function;
-import org.apache.cassandra.distributed.api.IInstance;
-
/**
* This version is only supported for a Cluster running the same code as the test environment, and permits
* ergonomic cross-node behaviours, without editing the cross-version API.
- *
+ * <p>
* A lambda can be written tto be invoked on any or all of the nodes.
- *
+ * <p>
* The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will
* not be the same in the alternate version. Even were it not, there would likely be a runtime linkage error given
* any code divergence.
*/
public interface IInvokableInstance extends IInstance
{
- public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
- public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
- public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }
+ default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
+ default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
+ default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }
- public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
- public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
- public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
+ default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
+ default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
+ default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
- public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
- public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
+ default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
+ default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
- public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
- public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
+ default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
+ default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
- public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
- public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
+ default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
+ default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
- public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
- public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
+ default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
+ default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
- public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
- public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
+ default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
+ default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
- public <E extends Serializable> E transfer(E object);
+ <E extends Serializable> E transfer(E object);
-}
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
index d811b17..c8a66e4 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
@@ -30,28 +30,28 @@
/**
* Represents a clean way to handoff evaluation of some work to an executor associated
* with a node's lifetime.
- *
+ * <p>
* There is no transfer of execution to the parallel class hierarchy.
- *
+ * <p>
* Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class
* to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself.
* Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary.
*/
public interface IIsolatedExecutor
{
- public interface CallableNoExcept<O> extends Callable<O> { public O call(); }
- public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { }
- public interface SerializableRunnable extends Runnable, Serializable {}
- public interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
- public interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
- public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
- public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
- public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
- public interface TriFunction<I1, I2, I3, O>
- {
- O apply(I1 i1, I2 i2, I3 i3);
- }
- public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { }
+ interface CallableNoExcept<O> extends Callable<O> { O call(); }
+
+ interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable {}
+ interface SerializableRunnable extends Runnable, Serializable {}
+ interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
+ interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
+ interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
+ interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
+ interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
+
+ interface TriFunction<I1, I2, I3, O> { O apply(I1 i1, I2 i2, I3 i3); }
+
+ interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> {}
Future<Void> shutdown();
@@ -124,5 +124,4 @@
* Convert the execution to one performed synchronously on the IsolatedExecutor
*/
<I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);
-
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IListen.java b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
index d21c594..18bc5be 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IListen.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
@@ -20,7 +20,10 @@
public interface IListen
{
- interface Cancel { void cancel(); }
+ interface Cancel
+ {
+ void cancel();
+ }
Cancel schema(Runnable onChange);
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
index f75861e..c75b89a 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
@@ -23,15 +23,19 @@
/**
* A cross-version interface for delivering internode messages via message sinks.
- *
+ * <p>
* Message implementations should be serializable so we could load into instances.
*/
public interface IMessage extends Serializable
{
int verb();
+
byte[] bytes();
+
// TODO: need to make this a long
int id();
+
int version();
+
InetSocketAddress from();
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
index f2cd6ee..df4ef36 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -22,18 +22,21 @@
public interface IMessageFilters
{
- public interface Filter
+ interface Filter
{
Filter off();
+
Filter on();
}
- public interface Builder
+ interface Builder
{
- Builder from(int ... nums);
- Builder to(int ... nums);
+ Builder from(int... nums);
+
+ Builder to(int... nums);
Builder verbs(int... verbs);
+
Builder allVerbs();
Builder inbound(boolean inbound);
@@ -53,31 +56,42 @@
* other matchers in the chain will return `true` as well).
*/
Builder messagesMatching(Matcher filter);
+
Filter drop();
}
- public interface Matcher
+ interface Matcher
{
boolean matches(int from, int to, IMessage message);
- static Matcher of(Predicate<IMessage> fn) {
+ static Matcher of(Predicate<IMessage> fn)
+ {
return (from, to, m) -> fn.test(m);
}
}
Builder inbound(boolean inbound);
- default Builder inbound() {
+
+ default Builder inbound()
+ {
return inbound(true);
}
- default Builder outbound() {
+
+ default Builder outbound()
+ {
return inbound(false);
}
- default Builder verbs(int... verbs) {
+
+ default Builder verbs(int... verbs)
+ {
return inbound().verbs(verbs);
}
- default Builder allVerbs() {
+
+ default Builder allVerbs()
+ {
return inbound().allVerbs();
}
+
void reset();
/**
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
index da864e0..fd1d4f8 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
@@ -18,12 +18,11 @@
package org.apache.cassandra.distributed.api;
-import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.shared.Versions;
// this lives outside the api package so that we do not have to worry about inter-version compatibility
public interface IUpgradeableInstance extends IInstance
{
// only to be invoked while the node is shutdown!
- public void setVersion(Versions.Version version);
+ void setVersion(Versions.Version version);
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
index 773f617..8b17c3a 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -18,14 +18,14 @@
package org.apache.cassandra.distributed.api;
-import org.apache.cassandra.distributed.shared.AssertUtils;
-
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.management.Notification;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+
public class NodeToolResult
{
private final String[] commandAndArgs;
@@ -66,28 +66,35 @@
return new Asserts();
}
- public final class Asserts {
- public Asserts success() {
+ public final class Asserts
+ {
+ public Asserts success()
+ {
AssertUtils.assertEquals("nodetool command " + commandAndArgs[0] + " was not successful", 0, rc);
return this;
}
- public Asserts failure() {
+ public Asserts failure()
+ {
AssertUtils.assertNotEquals("nodetool command " + commandAndArgs[0] + " was successful but not expected to be", 0, rc);
return this;
}
- public Asserts errorContains(String msg) {
+ public Asserts errorContains(String msg)
+ {
AssertUtils.assertNotNull("No exception was found but expected one", error);
AssertUtils.assertTrue("Error message '" + error.getMessage() + "' does not contain '" + msg + "'", error.getMessage().contains(msg));
return this;
}
- public Asserts notificationContains(String msg) {
+ public Asserts notificationContains(String msg)
+ {
AssertUtils.assertNotNull("notifications not defined", notifications);
AssertUtils.assertFalse("notifications not defined", notifications.isEmpty());
- for (Notification n : notifications) {
- if (n.getMessage().contains(msg)) {
+ for (Notification n : notifications)
+ {
+ if (n.getMessage().contains(msg))
+ {
return this;
}
}
@@ -95,13 +102,17 @@
return this; // unreachable
}
- public Asserts notificationContains(ProgressEventType type, String msg) {
+ public Asserts notificationContains(ProgressEventType type, String msg)
+ {
int userType = type.ordinal();
AssertUtils.assertNotNull("notifications not defined", notifications);
AssertUtils.assertFalse("notifications not defined", notifications.isEmpty());
- for (Notification n : notifications) {
- if (notificationType(n) == userType) {
- if (n.getMessage().contains(msg)) {
+ for (Notification n : notifications)
+ {
+ if (notificationType(n) == userType)
+ {
+ if (n.getMessage().contains(msg))
+ {
return this;
}
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
index dcdfa14..e72d33e 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
@@ -25,7 +25,7 @@
/**
* A table of data representing a complete query result.
- *
+ * <p>
* A <code>QueryResult</code> is different from {@link java.sql.ResultSet} in several key ways:
*
* <ul>
@@ -38,7 +38,7 @@
* </ul>
*
* <h2>Unsafe patterns</h2>
- *
+ * <p>
* Below are a few unsafe patterns which may lead to unexpected results
*
* <code>{@code
@@ -50,7 +50,7 @@
* <code>{@code
* rs.forEach(list::add)
* }</code>
- *
+ * <p>
* Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}. Since the same {@link Row}
* object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact
* points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java
index 1dd05fe..530edc1 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/Row.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java
@@ -18,11 +18,18 @@
package org.apache.cassandra.distributed.api;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
/**
* Data representing a single row in a query result.
- *
+ * <p>
* This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls
* to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call;
* to get around this, a call to {@link #copy()} will return a new object pointing to the same row.
@@ -36,7 +43,8 @@
{
Objects.requireNonNull(names, "names");
this.nameIndex = new HashMap<>(names.length);
- for (int i = 0; i < names.length; i++) {
+ for (int i = 0; i < names.length; i++)
+ {
nameIndex.put(names[i], i);
}
}
@@ -54,7 +62,8 @@
/**
* Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}.
*/
- public Row copy() {
+ public Row copy()
+ {
Row copy = new Row(nameIndex);
copy.setResults(results);
return copy;
diff --git a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
index a714cd5..ebc921c 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
@@ -18,10 +18,12 @@
package org.apache.cassandra.distributed.api;
-public interface TokenSupplier {
+public interface TokenSupplier
+{
long token(int nodeId);
- static TokenSupplier evenlyDistributedTokens(int numNodes) {
+ static TokenSupplier evenlyDistributedTokens(int numNodes)
+ {
long increment = (Long.MAX_VALUE / numNodes) * 2;
return (int nodeId) -> {
assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
index 24c37d4..8e6254a 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
@@ -23,7 +23,8 @@
import java.util.Iterator;
import java.util.List;
-public class AssertUtils {
+public class AssertUtils
+{
public static void assertRows(Object[][] actual, Object[]... expected)
{
@@ -55,16 +56,20 @@
public static void assertRows(Iterator<Object[]> actual, Object[]... expected)
{
- assertRows(actual, new Iterator<Object[]>() {
+ assertRows(actual, new Iterator<Object[]>()
+ {
int i = 0;
+
@Override
- public boolean hasNext() {
+ public boolean hasNext()
+ {
return i < expected.length;
}
@Override
- public Object[] next() {
+ public Object[] next()
+ {
return expected[i++];
}
});
@@ -115,34 +120,39 @@
return expected;
}
- public static void assertEquals(String message, long expected, long actual) {
+ public static void assertEquals(String message, long expected, long actual)
+ {
if (expected != actual)
fail(message);
}
- public static void assertNotEquals(String message, long expected, long actual) {
+ public static void assertNotEquals(String message, long expected, long actual)
+ {
if (expected == actual)
fail(message);
}
- public static void assertNotNull(String message, Object object) {
+ public static void assertNotNull(String message, Object object)
+ {
if (object == null)
fail(message);
}
- public static void assertTrue(String message, boolean condition) {
+ public static void assertTrue(String message, boolean condition)
+ {
if (!condition)
fail(message);
}
- public static void assertFalse(String message, boolean condition) {
+ public static void assertFalse(String message, boolean condition)
+ {
if (condition)
fail(message);
}
- public static void fail(String message) {
+ public static void fail(String message)
+ {
throw new AssertionError(message);
}
-
}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
index 6f6adfb..b3b7db0 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
@@ -18,11 +18,6 @@
package org.apache.cassandra.distributed.shared;
-import org.apache.cassandra.distributed.api.ICluster;
-import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.distributed.api.TokenSupplier;
-
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -34,13 +29,20 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+
import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
-public abstract class Builder<I extends IInstance, C extends ICluster> {
+public abstract class Builder<I extends IInstance, C extends ICluster>
+{
private final int BROADCAST_PORT = 7012;
- public interface Factory<I extends IInstance, C extends ICluster> {
+ public interface Factory<I extends IInstance, C extends ICluster>
+ {
C newCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader);
}
@@ -53,27 +55,31 @@
private Versions.Version version;
private Consumer<IInstanceConfig> configUpdater;
- public Builder(Factory<I, C> factory) {
+ public Builder(Factory<I, C> factory)
+ {
this.factory = factory;
}
- public C start() throws IOException {
+ public C start() throws IOException
+ {
C cluster = createWithoutStarting();
cluster.startup();
return cluster;
}
- public C createWithoutStarting() throws IOException {
+ public C createWithoutStarting() throws IOException
+ {
if (root == null)
root = Files.createTempDirectory("dtests").toFile();
if (nodeCount <= 0)
throw new IllegalStateException("Cluster must have at least one node");
- if (nodeIdTopology == null) {
+ if (nodeIdTopology == null)
+ {
nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
- .collect(Collectors.toMap(nodeId -> nodeId,
- nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
}
root.mkdirs();
@@ -86,7 +92,8 @@
if (tokenSupplier == null)
tokenSupplier = evenlyDistributedTokens(nodeCount);
- for (int i = 0; i < nodeCount; ++i) {
+ for (int i = 0; i < nodeCount; ++i)
+ {
int nodeNum = i + 1;
configs.add(createInstanceConfig(nodeNum));
}
@@ -94,11 +101,13 @@
return factory.newCluster(root, version, configs, sharedClassLoader);
}
- public IInstanceConfig newInstanceConfig(C cluster) {
+ public IInstanceConfig newInstanceConfig(C cluster)
+ {
return createInstanceConfig(cluster.size() + 1);
}
- protected IInstanceConfig createInstanceConfig(int nodeNum) {
+ protected IInstanceConfig createInstanceConfig(int nodeNum)
+ {
String ipPrefix = "127.0." + subnet + ".";
String seedIp = ipPrefix + "1";
String ipAddress = ipPrefix + nodeNum;
@@ -115,26 +124,31 @@
protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp);
- public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) {
+ public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
+ {
this.tokenSupplier = tokenSupplier;
return this;
}
- public Builder<I, C> withSubnet(int subnet) {
+ public Builder<I, C> withSubnet(int subnet)
+ {
this.subnet = subnet;
return this;
}
- public Builder<I, C> withNodes(int nodeCount) {
+ public Builder<I, C> withNodes(int nodeCount)
+ {
this.nodeCount = nodeCount;
return this;
}
- public Builder<I, C> withDCs(int dcCount) {
+ public Builder<I, C> withDCs(int dcCount)
+ {
return withRacks(dcCount, 1);
}
- public Builder<I, C> withRacks(int dcCount, int racksPerDC) {
+ public Builder<I, C> withRacks(int dcCount, int racksPerDC)
+ {
if (nodeCount == 0)
throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
@@ -143,21 +157,25 @@
return withRacks(dcCount, racksPerDC, nodesPerRack);
}
- public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack) {
+ public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
+ {
if (nodeIdTopology != null)
throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
nodeIdTopology = new HashMap<>();
int nodeId = 1;
- for (int dc = 1; dc <= dcCount; dc++) {
- for (int rack = 1; rack <= racksPerDC; rack++) {
+ for (int dc = 1; dc <= dcCount; dc++)
+ {
+ for (int rack = 1; rack <= racksPerDC; rack++)
+ {
for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
}
}
// adjust the node count to match the allocatation
final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
- if (adjustedNodeCount != nodeCount) {
+ if (adjustedNodeCount != nodeCount)
+ {
assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
System.out.println(String.format("Network topology of %s DCs with %s racks per DC and %s nodes per rack required increasing total nodes to %s",
dcCount, racksPerDC, nodesPerRack, adjustedNodeCount));
@@ -166,12 +184,15 @@
return this;
}
- public Builder<I, C> withDC(String dcName, int nodeCount) {
+ public Builder<I, C> withDC(String dcName, int nodeCount)
+ {
return withRack(dcName, rackName(1), nodeCount);
}
- public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack) {
- if (nodeIdTopology == null) {
+ public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
+ {
+ if (nodeIdTopology == null)
+ {
if (nodeCount > 0)
throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
@@ -185,7 +206,8 @@
}
// Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
- public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) {
+ public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
+ {
if (nodeIdTopology.isEmpty())
throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
@@ -194,7 +216,8 @@
throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
});
- if (nodeCount != nodeIdTopology.size()) {
+ if (nodeCount != nodeIdTopology.size())
+ {
nodeCount = nodeIdTopology.size();
System.out.println(String.format("Adjusting node count to %s for supplied network topology", nodeCount));
}
@@ -204,17 +227,20 @@
return this;
}
- public Builder<I, C> withRoot(File root) {
+ public Builder<I, C> withRoot(File root)
+ {
this.root = root;
return this;
}
- public Builder<I, C> withVersion(Versions.Version version) {
+ public Builder<I, C> withVersion(Versions.Version version)
+ {
this.version = version;
return this;
}
- public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater) {
+ public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater)
+ {
this.configUpdater = updater;
return this;
}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
index d28af2a..3ec5426 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
@@ -29,7 +29,8 @@
System.gc();
}
- public static void beforeClass() throws Throwable {
+ public static void beforeClass() throws Throwable
+ {
ICluster.setup();
}
@@ -52,6 +53,4 @@
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
return cluster;
}
-
-
}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
index c37a520..4f2fc8a 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
@@ -27,18 +27,18 @@
public class InstanceClassLoader extends URLClassLoader
{
private static final Predicate<String> sharePackage = name ->
- name.startsWith("org.apache.cassandra.distributed.api.")
- || name.startsWith("org.apache.cassandra.distributed.shared.")
- || name.startsWith("sun.")
- || name.startsWith("oracle.")
- || name.startsWith("com.intellij.")
- || name.startsWith("com.sun.")
- || name.startsWith("com.oracle.")
- || name.startsWith("java.")
- || name.startsWith("javax.")
- || name.startsWith("jdk.")
- || name.startsWith("netscape.")
- || name.startsWith("org.xml.sax.");
+ name.startsWith("org.apache.cassandra.distributed.api.")
+ || name.startsWith("org.apache.cassandra.distributed.shared.")
+ || name.startsWith("sun.")
+ || name.startsWith("oracle.")
+ || name.startsWith("com.intellij.")
+ || name.startsWith("com.sun.")
+ || name.startsWith("com.oracle.")
+ || name.startsWith("java.")
+ || name.startsWith("javax.")
+ || name.startsWith("jdk.")
+ || name.startsWith("netscape.")
+ || name.startsWith("org.xml.sax.");
private volatile boolean isClosed = false;
private final URL[] urls;
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
index c1731db..62f4f40 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
@@ -87,8 +87,8 @@
{
return (from == null ? 0 : Arrays.hashCode(from))
+ (to == null ? 0 : Arrays.hashCode(to))
- + (verbs == null ? 0 : Arrays.hashCode(verbs)
- + parent.hashCode());
+ + (verbs == null ? 0 : Arrays.hashCode(verbs)
+ + parent.hashCode());
}
public boolean equals(Object that)
@@ -100,8 +100,8 @@
{
return Arrays.equals(from, that.from)
&& Arrays.equals(to, that.to)
- && Arrays.equals(verbs, that.verbs)
- && parent.equals(that.parent);
+ && Arrays.equals(verbs, that.verbs)
+ && parent.equals(that.parent);
}
public Filter off()
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
index 01f8d29..e1c434f 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
@@ -20,9 +20,9 @@
public interface ThrowingRunnable
{
- public void run() throws Throwable;
+ void run() throws Throwable;
- public static Runnable toRunnable(ThrowingRunnable runnable)
+ static Runnable toRunnable(ThrowingRunnable runnable)
{
return () -> {
try
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
index 51c9e49..469022e 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
@@ -18,18 +18,23 @@
package org.apache.cassandra.distributed.shared;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Paths;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class Versions
{
private static final Logger logger = LoggerFactory.getLogger(Versions.class);
@@ -66,6 +71,7 @@
v3X("3\\.([1-9]|1[01])(\\.([0-9]+))?"),
v4("4\\.([0-9]+)");
final Pattern pattern;
+
Major(String verify)
{
this.pattern = Pattern.compile(verify);
@@ -132,6 +138,7 @@
{
this(Major.fromFull(version), version, classpath);
}
+
public Version(Major major, String version, URL[] classpath)
{
this.major = major;
@@ -141,6 +148,7 @@
}
private final Map<Major, List<Version>> versions;
+
public Versions(Map<Major, List<Version>> versions)
{
this.versions = versions;
@@ -149,10 +157,10 @@
public Version get(String full)
{
return versions.get(Major.fromFull(full))
- .stream()
- .filter(v -> full.equals(v.version))
- .findFirst()
- .orElseThrow(() -> new RuntimeException("No version " + full + " found"));
+ .stream()
+ .filter(v -> full.equals(v.version))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No version " + full + " found"));
}
public Version getLatest(Major major)
@@ -162,7 +170,7 @@
public static Versions find()
{
- final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + "test.dtest_jar_path","build");
+ final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + "test.dtest_jar_path", "build");
final File sourceDirectory = new File(dtestJarDirectory);
logger.info("Looking for dtest jars in " + sourceDirectory.getAbsolutePath());
final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar");
@@ -177,7 +185,7 @@
continue;
String version = m.group(1);
Major major = Major.fromFull(version);
- versions.get(major).add(new Version(major, version, new URL[] { toURL(file) }));
+ versions.get(major).add(new Version(major, version, new URL[]{ toURL(file) }));
}
for (Map.Entry<Major, List<Version>> e : versions.entrySet())
@@ -202,5 +210,4 @@
throw new IllegalArgumentException(e);
}
}
-
}