blob: 924eb717149d2106fb01d80943ddb9f22817a3eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.routes.tokenrange;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
import org.apache.cassandra.testing.CassandraIntegrationTest;
import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Base class for TokenRangeIntegrationJoining tests
*/
class JoiningBaseTest extends BaseTokenRangeIntegrationTest
{
void runJoiningTestScenario(VertxTestContext context,
CountDownLatch transientStateStart,
CountDownLatch transientStateEnd,
UpgradeableCluster cluster,
List<Range<BigInteger>> expectedRanges,
Map<String, Map<Range<BigInteger>, List<String>>> expectedRangeMappings,
boolean isCrossDCKeyspace)
throws Exception
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
try
{
Set<String> dcReplication;
if (annotation.numDcs() > 1 && isCrossDCKeyspace)
{
createTestKeyspace(ImmutableMap.of("replication_factor", DEFAULT_RF));
dcReplication = Sets.newHashSet(Arrays.asList("datacenter1", "datacenter2"));
}
else
{
createTestKeyspace(ImmutableMap.of("datacenter1", DEFAULT_RF));
dcReplication = Collections.singleton("datacenter1");
}
IUpgradeableInstance seed = cluster.get(1);
List<IUpgradeableInstance> newInstances = new ArrayList<>();
// Go over new nodes and add them once for each DC
for (int i = 0; i < annotation.newNodesPerDc(); i++)
{
int dcNodeIdx = 1; // Use node 2's DC
for (int dc = 1; dc <= annotation.numDcs(); dc++)
{
IUpgradeableInstance dcNode = cluster.get(dcNodeIdx++);
IUpgradeableInstance newInstance = ClusterUtils.addInstance(cluster,
dcNode.config().localDatacenter(),
dcNode.config().localRack(),
inst -> {
inst.set("auto_bootstrap", true);
inst.with(Feature.GOSSIP,
Feature.JMX,
Feature.NATIVE_PROTOCOL);
});
new Thread(() -> newInstance.startup(cluster)).start();
newInstances.add(newInstance);
}
}
Uninterruptibles.awaitUninterruptibly(transientStateStart, 2, TimeUnit.MINUTES);
for (IUpgradeableInstance newInstance : newInstances)
{
ClusterUtils.awaitRingState(seed, newInstance, "Joining");
}
retrieveMappingWithKeyspace(context, TEST_KEYSPACE, response -> {
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
TokenRangeReplicasResponse mappingResponse = response.bodyAsJson(TokenRangeReplicasResponse.class);
assertMappingResponseOK(mappingResponse,
DEFAULT_RF,
dcReplication);
int finalNodeCount = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * annotation.numDcs();
TokenSupplier tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc(),
annotation.newNodesPerDc(),
annotation.numDcs(),
1);
// New split ranges resulting from joining nodes and corresponding tokens
List<Range<BigInteger>> splitRanges = extractSplitRanges(annotation.newNodesPerDc() *
annotation.numDcs(),
finalNodeCount,
tokenSupplier,
expectedRanges);
List<Integer> newNodes = newInstances.stream().map(i -> i.config().num()).collect(Collectors.toList());
validateNodeStates(mappingResponse,
dcReplication,
nodeNumber -> newNodes.contains(nodeNumber) ? "Joining" : "Normal");
validateTokenRanges(mappingResponse, expectedRanges);
validateReplicaMapping(mappingResponse,
newInstances,
isCrossDCKeyspace,
splitRanges,
expectedRangeMappings);
context.completeNow();
});
}
finally
{
for (int i = 0;
i < (annotation.newNodesPerDc() * annotation.numDcs()); i++)
{
transientStateEnd.countDown();
}
}
}
private void validateReplicaMapping(TokenRangeReplicasResponse mappingResponse,
List<IUpgradeableInstance> newInstances,
boolean isCrossDCKeyspace,
List<Range<BigInteger>> splitRanges,
Map<String, Map<Range<BigInteger>, List<String>>> expectedRangeMappings)
{
if (!isCrossDCKeyspace)
{
newInstances = newInstances.stream()
.filter(i -> i.config().localDatacenter().equals("datacenter1"))
.collect(Collectors.toList());
}
List<String> transientNodeAddresses = newInstances.stream().map(i -> {
InetSocketAddress address = i.config().broadcastAddress();
return address.getAddress().getHostAddress() +
":" +
address.getPort();
}).collect(Collectors.toList());
Set<String> writeReplicaInstances = instancesFromReplicaSet(mappingResponse.writeReplicas());
Set<String> readReplicaInstances = instancesFromReplicaSet(mappingResponse.readReplicas());
Set<String> splitRangeReplicas
= mappingResponse.writeReplicas().stream()
.filter(w -> matchSplitRanges(w, splitRanges))
.map(r ->
r.replicasByDatacenter().values())
.flatMap(Collection::stream)
.flatMap(list -> list.stream())
.collect(Collectors.toSet());
assertThat(readReplicaInstances).doesNotContainAnyElementsOf(transientNodeAddresses);
// Validate that the new nodes are mapped to the split ranges
assertThat(splitRangeReplicas).containsAll(transientNodeAddresses);
assertThat(writeReplicaInstances).containsAll(transientNodeAddresses);
validateWriteReplicaMappings(mappingResponse.writeReplicas(), expectedRangeMappings, isCrossDCKeyspace);
}
private List<Range<BigInteger>> extractSplitRanges(int newNodes,
int finalNodeCount,
TokenSupplier tokenSupplier,
List<Range<BigInteger>> expectedRanges)
{
int newNode = 1;
List<BigInteger> newNodeTokens = new ArrayList<>();
while (newNode <= newNodes)
{
int nodeIdx = finalNodeCount - newNode;
newNodeTokens.add(new BigInteger(tokenSupplier.tokens(nodeIdx).stream().findFirst().get()));
newNode++;
}
return expectedRanges.stream()
.filter(r -> newNodeTokens.contains(r.upperEndpoint()) ||
newNodeTokens.contains(r.lowerEndpoint()))
.collect(Collectors.toList());
}
private boolean matchSplitRanges(TokenRangeReplicasResponse.ReplicaInfo range,
List<Range<BigInteger>> expectedSplitRanges)
{
return expectedSplitRanges.stream()
.anyMatch(s -> range.start().equals(s.lowerEndpoint().toString()) &&
range.end().equals(s.upperEndpoint().toString()));
}
void runJoiningTestScenario(VertxTestContext context,
ConfigurableCassandraTestContext cassandraTestContext,
BiConsumer<ClassLoader, Integer> instanceInitializer,
CountDownLatch transientStateStart,
CountDownLatch transientStateEnd,
Map<String, Map<Range<BigInteger>, List<String>>> expectedRangeMappings)
throws Exception
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
TokenSupplier tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc(),
annotation.newNodesPerDc(),
annotation.numDcs(),
1);
UpgradeableCluster cluster = cassandraTestContext.configureAndStartCluster(builder -> {
builder.withInstanceInitializer(instanceInitializer);
builder.withTokenSupplier(tokenSupplier);
});
runJoiningTestScenario(context,
transientStateStart,
transientStateEnd,
cluster,
generateExpectedRanges(),
expectedRangeMappings,
true);
}
}