blob: 22d3b0e1809a2cdf424e9d3500ec1ad99b6a1000 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.routes.tokenrange;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.sidecar.adapters.base.Partitioner;
import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
import org.apache.cassandra.testing.CassandraIntegrationTest;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Base class for TokenRangeIntegrationMoving Tests
*/
class MovingBaseTest extends BaseTokenRangeIntegrationTest
{
public static final int MOVING_NODE_IDX = 5;
public static final int MULTIDC_MOVING_NODE_IDX = 10;
void runMovingTestScenario(VertxTestContext context,
CountDownLatch transientStateStart,
CountDownLatch transientStateEnd,
UpgradeableCluster cluster,
Map<String, Map<Range<BigInteger>, List<String>>> expectedRangeMappings,
long moveTargetToken) throws Exception
{
try
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
Set<String> dcReplication;
if (annotation.numDcs() > 1)
{
createTestKeyspace(ImmutableMap.of("replication_factor", DEFAULT_RF));
dcReplication = Sets.newHashSet(Arrays.asList("datacenter1", "datacenter2"));
}
else
{
createTestKeyspace(ImmutableMap.of("datacenter1", DEFAULT_RF));
dcReplication = Collections.singleton("datacenter1");
}
IUpgradeableInstance seed = cluster.get(1);
int movingNodeIndex = (annotation.numDcs() > 1) ? MULTIDC_MOVING_NODE_IDX : MOVING_NODE_IDX;
IUpgradeableInstance movingNode = cluster.get(movingNodeIndex);
new Thread(() -> movingNode.nodetoolResult("move", "--", Long.toString(moveTargetToken))
.asserts()
.success()).start();
// Wait until nodes have reached expected state
Uninterruptibles.awaitUninterruptibly(transientStateStart, 2, TimeUnit.MINUTES);
ClusterUtils.awaitRingState(seed, movingNode, "Moving");
retrieveMappingWithKeyspace(context, TEST_KEYSPACE, response -> {
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
TokenRangeReplicasResponse mappingResponse = response.bodyAsJson(TokenRangeReplicasResponse.class);
assertMappingResponseOK(mappingResponse,
DEFAULT_RF,
dcReplication);
validateNodeStates(mappingResponse,
dcReplication,
nodeNumber -> nodeNumber == movingNodeIndex ? "Moving" : "Normal");
List<Range<BigInteger>> expectedRanges = getMovingNodesExpectedRanges(annotation.nodesPerDc(),
annotation.numDcs(),
moveTargetToken);
validateTokenRanges(mappingResponse, expectedRanges);
validateReplicaMapping(mappingResponse, movingNode, moveTargetToken, expectedRangeMappings);
context.completeNow();
});
}
finally
{
transientStateEnd.countDown();
}
}
private void validateReplicaMapping(TokenRangeReplicasResponse mappingResponse,
IUpgradeableInstance movingNode,
long moveTo,
Map<String, Map<Range<BigInteger>, List<String>>> expectedRangeMappings)
{
InetSocketAddress address = movingNode.config().broadcastAddress();
String expectedAddress = address.getAddress().getHostAddress() +
":" +
address.getPort();
Set<String> writeReplicaInstances = instancesFromReplicaSet(mappingResponse.writeReplicas());
Set<String> readReplicaInstances = instancesFromReplicaSet(mappingResponse.readReplicas());
Optional<TokenRangeReplicasResponse.ReplicaInfo> moveResultRange // Get ranges ending in move token
= mappingResponse.writeReplicas()
.stream()
.filter(r -> r.end().equals(String.valueOf(moveTo)))
.findAny();
assertThat(moveResultRange.isPresent());
List<String> replicasInRange = moveResultRange.get().replicasByDatacenter().values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertThat(replicasInRange).contains(expectedAddress);
assertThat(readReplicaInstances).contains(expectedAddress);
assertThat(writeReplicaInstances).contains(expectedAddress);
validateWriteReplicaMappings(mappingResponse.writeReplicas(), expectedRangeMappings);
}
protected List<Range<BigInteger>> getMovingNodesExpectedRanges(int initialNodeCount, int numDcs, long moveTo)
{
boolean moveHandled = false;
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
TokenSupplier tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc(),
annotation.newNodesPerDc(),
annotation.numDcs(),
1);
List<Range<BigInteger>> expectedRanges = new ArrayList<>();
BigInteger startToken = Partitioner.Murmur3.minToken;
BigInteger endToken = Partitioner.Murmur3.maxToken;
int node = 1;
BigInteger prevToken = new BigInteger(tokenSupplier.tokens(node++).stream().findFirst().get());
Range<BigInteger> firstRange = Range.openClosed(startToken, prevToken);
expectedRanges.add(firstRange);
while (node <= (initialNodeCount * numDcs))
{
BigInteger currentToken = new BigInteger(tokenSupplier.tokens(node).stream().findFirst().get());
if (!moveHandled && currentToken.compareTo(BigInteger.valueOf(moveTo)) > 0)
{
expectedRanges.add(Range.openClosed(prevToken, BigInteger.valueOf(moveTo)));
expectedRanges.add(Range.openClosed(BigInteger.valueOf(moveTo), currentToken));
moveHandled = true;
}
else
{
expectedRanges.add(Range.openClosed(prevToken, currentToken));
}
prevToken = currentToken;
node++;
}
expectedRanges.add(Range.openClosed(prevToken, endToken));
return expectedRanges;
}
protected long getMoveTargetToken(UpgradeableCluster cluster)
{
CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation;
IUpgradeableInstance seed = cluster.get(1);
// The target token to move the node to is calculated by adding an offset to the seed node token which
// is half of the range between 2 tokens.
// For multi-DC case (specifically 2 DCs), since neighbouring tokens can be consecutive, we use tokens 1
// and 3 to calculate the offset
int nextIndex = (annotation.numDcs() > 1) ? 3 : 2;
long t2 = Long.parseLong(seed.config().getString("initial_token"));
long t3 = Long.parseLong(cluster.get(nextIndex).config().getString("initial_token"));
return (t2 + ((t3 - t2) / 2));
}
}