blob: 0d789c3ef7eb8a395cd395722dae7296813d375b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.routes.tokenrange;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.collect.Range;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.sidecar.IntegrationTestBase;
import org.apache.cassandra.sidecar.adapters.base.Partitioner;
import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
import org.apache.cassandra.testing.AbstractCassandraTestContext;
import org.apache.cassandra.testing.CassandraIntegrationTest;
import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test the token range replica mapping endpoint with the in-jvm dtest framework.
*/
public class BaseTokenRangeIntegrationTest extends IntegrationTestBase
{
protected void validateTokenRanges(TokenRangeReplicasResponse mappingsResponse,
List<Range<BigInteger>> expectedRanges)
{
List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicaSet = mappingsResponse.writeReplicas();
List<TokenRangeReplicasResponse.ReplicaInfo> readReplicaSet = mappingsResponse.readReplicas();
List<Range<BigInteger>> writeRanges = writeReplicaSet.stream()
.map(r -> Range.openClosed(new BigInteger(r.start()),
new BigInteger(r.end())))
.collect(Collectors.toList());
List<Range<BigInteger>> readRanges = readReplicaSet.stream()
.map(r -> Range.openClosed(new BigInteger(r.start()),
new BigInteger(r.end())))
.collect(Collectors.toList());
assertThat(writeRanges).containsExactlyElementsOf(expectedRanges);
//Sorted and Overlap check
validateOrderAndOverlaps(writeRanges);
validateOrderAndOverlaps(readRanges);
}
private void validateOrderAndOverlaps(List<Range<BigInteger>> ranges)
{
for (int r = 0; r < ranges.size() - 1; r++)
{
assertThat(ranges.get(r).upperEndpoint()).isLessThan(ranges.get(r + 1).upperEndpoint());
assertThat(ranges.get(r).intersection(ranges.get(r + 1)).isEmpty()).isTrue();
}
}
protected void validateNodeStates(TokenRangeReplicasResponse mappingResponse,
Set<String> dcReplication,
Function<Integer, String> statusFunction)
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
int expectedReplicas = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * dcReplication.size();
AbstractCassandraTestContext cassandraTestContext = sidecarTestContext.cassandraTestContext();
assertThat(mappingResponse.replicaState().size()).isEqualTo(expectedReplicas);
for (int i = 1; i <= cassandraTestContext.cluster().size(); i++)
{
IInstanceConfig config = cassandraTestContext.cluster().get(i).config();
if (dcReplication.contains(config.localDatacenter()))
{
String ipAndPort = config.broadcastAddress().getAddress().getHostAddress() + ":"
+ config.broadcastAddress().getPort();
String expectedStatus = statusFunction.apply(i);
assertThat(mappingResponse.replicaState().get(ipAndPort)).isEqualTo(expectedStatus);
}
}
}
protected UpgradeableCluster getMultiDCCluster(BiConsumer<ClassLoader, Integer> initializer,
ConfigurableCassandraTestContext cassandraTestContext)
throws IOException
{
return getMultiDCCluster(initializer, cassandraTestContext, null);
}
protected UpgradeableCluster getMultiDCCluster(BiConsumer<ClassLoader, Integer> initializer,
ConfigurableCassandraTestContext cassandraTestContext,
Consumer<UpgradeableCluster.Builder> additionalConfigurator)
throws IOException
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
TokenSupplier mdcTokenSupplier = TestTokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc(),
annotation.newNodesPerDc(),
annotation.numDcs(),
1);
int totalNodeCount = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * annotation.numDcs();
return cassandraTestContext.configureAndStartCluster(
builder -> {
builder.withInstanceInitializer(initializer);
builder.withTokenSupplier(mdcTokenSupplier);
builder.withNodeIdTopology(networkTopology(totalNodeCount,
(nodeId) -> nodeId % 2 != 0 ?
dcAndRack("datacenter1", "rack1") :
dcAndRack("datacenter2", "rack2")));
if (additionalConfigurator != null)
{
additionalConfigurator.accept(builder);
}
});
}
protected List<Range<BigInteger>> generateExpectedRanges()
{
return generateExpectedRanges(true);
}
protected List<Range<BigInteger>> generateExpectedRanges(boolean isCrossDCKeyspace)
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
// For single DC keyspaces, the ranges are initially allocated replicas from both DCs. As a result,
// we will take into account the node count across all DCs. It is only while accounting for the new/joining
// nodes that we will limit the nodes to the single DC, as the pending nodes for the given keyspace will
// exclude the nodes from other DCs.
int nodeCount = isCrossDCKeyspace ?
(annotation.nodesPerDc() + annotation.newNodesPerDc()) * annotation.numDcs() :
(annotation.nodesPerDc() * annotation.numDcs()) + annotation.newNodesPerDc();
return generateExpectedRanges(nodeCount);
}
protected List<Range<BigInteger>> generateExpectedRanges(int nodeCount)
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
TokenSupplier tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc(),
annotation.newNodesPerDc(),
annotation.numDcs(),
1);
TreeSet<BigInteger> tokens = new TreeSet<>();
int node = 1;
while (node <= nodeCount)
{
tokens.add(new BigInteger(tokenSupplier.tokens(node++).stream().findFirst().get()));
}
List<Range<BigInteger>> expectedRanges = new ArrayList<>();
BigInteger startToken = Partitioner.Murmur3.minToken;
BigInteger endToken = Partitioner.Murmur3.maxToken;
BigInteger prevToken = tokens.pollFirst();
Range<BigInteger> firstRange = Range.openClosed(startToken, prevToken);
expectedRanges.add(firstRange);
for (BigInteger token : tokens)
{
BigInteger currentToken = token;
expectedRanges.add(Range.openClosed(prevToken, currentToken));
prevToken = currentToken;
}
expectedRanges.add(Range.openClosed(prevToken, endToken));
return expectedRanges;
}
protected Set<String> instancesFromReplicaSet(List<TokenRangeReplicasResponse.ReplicaInfo> replicas)
{
return replicas.stream()
.flatMap(r -> r.replicasByDatacenter().values().stream())
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
protected void validateWriteReplicaMappings(List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas,
Map<String, Map<Range<BigInteger>, List<String>>> expectedRangeMapping)
{
validateWriteReplicaMappings(writeReplicas, expectedRangeMapping, true);
}
protected void validateWriteReplicaMappings(List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas,
Map<String, Map<Range<BigInteger>, List<String>>> expectedRangeMapping,
boolean isCrossDCKeyspace)
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
// Validates the no. of ranges in the write-replica mappings match the no. of expected ranges
assertThat(writeReplicas).hasSize(expectedRangeMapping.get("datacenter1").size());
for (TokenRangeReplicasResponse.ReplicaInfo r : writeReplicas)
{
Range<BigInteger> range = Range.openClosed(BigInteger.valueOf(Long.parseLong(r.start())),
BigInteger.valueOf(Long.parseLong(r.end())));
assertThat(expectedRangeMapping).containsKey("datacenter1");
assertThat(expectedRangeMapping.get("datacenter1")).containsKey(range);
// Replicaset for the same range match expected
List<String> replicaSetNoPort = r.replicasByDatacenter().get("datacenter1")
.stream()
.map(node -> node.split(":")[0])
.collect(Collectors.toList());
assertThat(replicaSetNoPort)
.containsExactlyInAnyOrderElementsOf(expectedRangeMapping.get("datacenter1").get(range));
if (annotation.numDcs() > 1 && isCrossDCKeyspace)
{
assertThat(expectedRangeMapping).containsKey("datacenter2");
assertThat(expectedRangeMapping.get("datacenter2")).containsKey(range);
List<String> replicaSetNoPortDc2 = r.replicasByDatacenter().get("datacenter2")
.stream()
.map(node -> node.split(":")[0])
.collect(Collectors.toList());
assertThat(replicaSetNoPortDc2)
.containsExactlyInAnyOrderElementsOf(expectedRangeMapping.get("datacenter2").get(range));
}
}
}
void retrieveMappingWithKeyspace(VertxTestContext context, String keyspace,
Handler<HttpResponse<Buffer>> verifier) throws Exception
{
String testRoute = "/api/v1/keyspaces/" + keyspace + "/token-range-replicas";
testWithClient(context, client -> {
client.get(server.actualPort(), "127.0.0.1", testRoute)
.send(context.succeeding(verifier));
});
}
void assertMappingResponseOK(TokenRangeReplicasResponse mappingResponse,
int replicationFactor,
Set<String> dcReplication)
{
assertThat(mappingResponse).isNotNull();
assertThat(mappingResponse.readReplicas()).isNotNull();
assertThat(mappingResponse.writeReplicas()).isNotNull();
TokenRangeReplicasResponse.ReplicaInfo readReplica = mappingResponse.readReplicas().get(0);
assertThat(readReplica.replicasByDatacenter()).isNotNull().hasSize(dcReplication.size());
TokenRangeReplicasResponse.ReplicaInfo writeReplica = mappingResponse.writeReplicas().get(0);
assertThat(writeReplica.replicasByDatacenter()).isNotNull().hasSize(dcReplication.size());
for (String dcName : dcReplication)
{
assertThat(readReplica.replicasByDatacenter().keySet()).isNotEmpty().contains(dcName);
assertThat(readReplica.replicasByDatacenter().get(dcName)).isNotNull().hasSize(replicationFactor);
assertThat(writeReplica.replicasByDatacenter().keySet()).isNotEmpty().contains(dcName);
assertThat(writeReplica.replicasByDatacenter().get(dcName)).isNotNull();
assertThat(writeReplica.replicasByDatacenter().get(dcName).size())
.isGreaterThanOrEqualTo(replicationFactor);
}
}
}