Add support for vnodes in jvm-dtest
Patch by David Capwell; reviewed by Alex Petrov, Josh McKenzie for CASSANDRA-17332
diff --git a/pom.xml b/pom.xml
index 534390e..7dc5603 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,12 @@
<version>3.5.10</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.quicktheories</groupId>
+ <artifactId>quicktheories</artifactId>
+ <version>0.26</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
index 4af4ae5..f5ff75d 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -24,13 +24,14 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Iterator;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.stream.Stream;
-public interface ICluster<I extends IInstance> extends AutoCloseable
+public interface ICluster<I extends IInstance> extends AutoCloseable, Iterable<I>
{
- public static final String PROPERTY_PREFIX = "cassandra.test";
+ String PROPERTY_PREFIX = "cassandra.test";
void startup();
@@ -54,6 +55,12 @@
Stream<I> stream(String dcName, String rackName);
+ @Override
+ default Iterator<I> iterator()
+ {
+ return stream().iterator();
+ }
+
IMessageFilters filters();
default void setMessageSink(IMessageSink messageSink) { throw new UnsupportedOperationException(); }
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
index 081d06a..b1c5ca7 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
@@ -58,7 +58,7 @@
@Override
public Row next()
{
- row.setResults(iterator.next());
+ row.unsafeSetResults(iterator.next());
return row;
}
});
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java
index ff4efbe..5487d5f 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/Row.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java
@@ -19,10 +19,8 @@
package org.apache.cassandra.distributed.api;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -62,8 +60,24 @@
this.nameIndex = nameIndex;
}
- void setResults(Object[] results)
+ public static Row of(Object... results)
{
+ String[] names = new String[results.length];
+ for (int i = 0; i < names.length; i++)
+ names[i] = "c" + i;
+ Row row = new Row(names);
+ row.setResults(results);
+ return row;
+ }
+
+ void unsafeSetResults(Object[] results)
+ {
+ this.results = results;
+ }
+
+ public void setResults(Object... results)
+ {
+ assert names.length == results.length : "Column names " + Arrays.toString(names) + " does not have the same length as results " + Arrays.toString(results);
this.results = results;
}
@@ -73,7 +87,7 @@
public Row copy()
{
Row copy = new Row(names, nameIndex);
- copy.setResults(results);
+ copy.unsafeSetResults(results);
return copy;
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
index 04509e2..5e58d37 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
@@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -59,7 +60,7 @@
* points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
* should be used; this will clone the {@link Row} and return a new object pointing to the same data.
*/
-public class SimpleQueryResult implements QueryResult
+public class SimpleQueryResult implements QueryResult, Iterable<Row>
{
private final String[] names;
private final Object[][] results;
@@ -108,13 +109,18 @@
return new SimpleQueryResult(names, results, filter.and(fn), offset);
}
+ @Override
+ public Iterator<Row> iterator() {
+ return new SimpleQueryResult(names, results, filter, offset);
+ }
+
/**
* Reset the cursor to the start of the query result; if the query result has not been iterated, this has no effect.
*/
public void reset()
{
offset = -1;
- row.setResults(null);
+ row.unsafeSetResults(null);
}
/**
@@ -133,13 +139,13 @@
return false;
while ((offset += 1) < results.length)
{
- row.setResults(results[offset]);
+ row.unsafeSetResults(results[offset]);
if (filter.test(row))
{
return true;
}
}
- row.setResults(null);
+ row.unsafeSetResults(null);
return false;
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
index ebc921c..96f51bc 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
@@ -18,17 +18,44 @@
package org.apache.cassandra.distributed.api;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
public interface TokenSupplier
{
- long token(int nodeId);
+ Collection<String> tokens(int nodeId);
+ @Deprecated
+ default long token(int nodeId)
+ {
+ Collection<String> tokens = tokens(nodeId);
+ assert tokens.size() == 1: "tokens function returned multiple tokens, only expected 1: " + tokens;
+ return Long.parseLong(tokens.stream().findFirst().get());
+ }
+
+ @Deprecated
static TokenSupplier evenlyDistributedTokens(int numNodes)
{
- long increment = (Long.MAX_VALUE / numNodes) * 2;
- return (int nodeId) -> {
- assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
- nodeId, numNodes);
- return Long.MIN_VALUE + 1 + nodeId * increment;
- };
+ return evenlyDistributedTokens(numNodes, 1);
+ }
+
+ static TokenSupplier evenlyDistributedTokens(int numNodes, int numTokens)
+ {
+ long increment = (Long.MAX_VALUE / (numNodes * numTokens)) * 2;
+ List<String>[] tokens = new List[numNodes];
+ for (int i = 0; i < numNodes; i++)
+ tokens[i] = new ArrayList<>(numTokens);
+
+ long value = Long.MIN_VALUE + 1;
+ for (int i = 0; i < numTokens; i++)
+ {
+ for (int nodeId = 1; nodeId <= numNodes; nodeId++)
+ {
+ value += increment;
+ tokens[nodeId - 1].add(Long.toString(value));
+ }
+ }
+ return (int nodeId) -> tokens[nodeId - 1];
}
}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
index d3c3494..665cdc5 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
@@ -26,13 +26,13 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -64,6 +64,16 @@
private int datadirCount = 3;
private final List<Rack> racks = new ArrayList<>();
private boolean finalised;
+ private int tokenCount = getDefaultTokenCount();
+ private boolean allowVnodes = true;
+
+ protected int getDefaultTokenCount() {
+ String key = "cassandra.dtest.num_tokens";
+ String value = System.getProperty(key);
+ if (value == null)
+ value = System.getenv(key.replace(".", "_").toUpperCase());
+ return value == null ? 1 : Integer.parseInt(value);
+ }
public AbstractBuilder(Factory<I, C, B> factory)
{
@@ -135,6 +145,14 @@
return datadirCount;
}
+ public int getTokenCount() {
+ return tokenCount;
+ }
+
+ public boolean isAllowVnodes() {
+ return allowVnodes;
+ }
+
public C start() throws IOException
{
C cluster = createWithoutStarting();
@@ -153,7 +171,7 @@
// TODO: make token allocation strategy configurable
if (tokenSupplier == null)
- tokenSupplier = evenlyDistributedTokens(nodeCount);
+ tokenSupplier = evenlyDistributedTokens(nodeCount, tokenCount);
return factory.newCluster((B) this);
}
@@ -181,6 +199,29 @@
return (B) this;
}
+ @Deprecated
+ public B withTokenSupplier(SingleTokenSupplier tokenSupplier)
+ {
+ this.tokenSupplier = tokenSupplier;
+ return (B) this;
+ }
+
+ /**
+ * This class is for source backwards compatability
+ */
+ @Deprecated
+ public interface SingleTokenSupplier extends TokenSupplier
+ {
+ @Override
+ default Collection<String> tokens(int nodeId)
+ {
+ return Collections.singletonList(Long.toString(token(nodeId)));
+ }
+
+ @Override
+ long token(int nodeId);
+ }
+
public B withSubnet(int subnet)
{
this.subnet = subnet;
@@ -339,6 +380,19 @@
return (B) this;
}
+ public B withTokenCount(int tokenCount)
+ {
+ assert tokenCount > 0 : "Token count must be positive; given " + tokenCount;
+ this.tokenCount = tokenCount;
+ return (B) this;
+ }
+
+ public B disallowVNodes()
+ {
+ this.allowVnodes = false;
+ return (B) this;
+ }
+
private void finaliseBuilder()
{
if (finalised)
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
index 1fb7149..f12a7b4 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
@@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -101,11 +102,14 @@
public Version get(Semver version)
{
- return versions.get(first(version))
- .stream()
+ Supplier<RuntimeException> onError = () -> new RuntimeException("No version " + version.getOriginalValue() + " found");
+ List<Version> versions = this.versions.get(first(version));
+ if (versions == null)
+ throw onError.get();
+ return versions.stream()
.filter(v -> version.equals(v.version))
.findFirst()
- .orElseThrow(() -> new RuntimeException("No version " + version.getOriginalValue() + " found"));
+ .orElseThrow(onError);
}
private static Semver first(Semver version)
diff --git a/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java b/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java
new file mode 100644
index 0000000..d56df1f
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.api;
+
+import org.junit.jupiter.api.Test;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.quicktheories.QuickTheory.qt;
+
+public class TokenSupplierTest {
+ @Test
+ public void evenlyDistributedTokens() {
+ Gen<Integer> nodeGen = SourceDSL.integers().between(1, 100);
+ Gen<Integer> tokenGen = SourceDSL.integers().between(1, 24);
+ qt().forAll(nodeGen, tokenGen).checkAssert((numNodes, numTokens) -> {
+ TokenSupplier ts = TokenSupplier.evenlyDistributedTokens(numNodes, numTokens);
+ SortedSet<Long> sortedTokens = new TreeSet<>();
+ for (int i = 0; i < numNodes; i++) {
+ Collection<String> tokens = ts.tokens(i + 1);
+ assertThat(tokens).hasSize(numTokens);
+ tokens.forEach(s -> sortedTokens.add(Long.valueOf(s)));
+ }
+ Long previous = null;
+ List<Long> diff = new ArrayList<>(sortedTokens.size() - 1);
+ for (Long token : sortedTokens) {
+ if (previous != null)
+ diff.add(token - previous);
+ previous = token;
+ }
+
+ assertThat(calculateSD(diff)).isLessThan(1_000);
+ });
+ }
+
+ private static double calculateSD(Collection<Long> values)
+ {
+ if (values.isEmpty())
+ return 0;
+ double sum = 0.0;
+ double sd = 0.0;
+
+ for (double num : values)
+ sum += num;
+
+ double mean = sum / values.size();
+
+ for (double num : values)
+ sd += Math.pow(num - mean, 2);
+
+ return Math.sqrt(sd / values.size());
+ }
+}
\ No newline at end of file