blob: 3de0bf51d5e8e5c1f8b17fa88aea8466cf38d8bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.hostreplacement;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.Constants;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.assertj.core.api.Assertions;
import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
import static org.apache.cassandra.distributed.shared.ClusterUtils.assertInRing;
import static org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
import static org.apache.cassandra.distributed.shared.ClusterUtils.getTokenMetadataTokens;
import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
public class HostReplacementTest extends TestBaseImpl
{
private static final Logger logger = LoggerFactory.getLogger(HostReplacementTest.class);
static
{
// Gossip has a notiion of quarantine, which is used to remove "fat clients" and "gossip only members"
// from the ring if not updated recently (recently is defined by this config).
// The reason for setting to 0 is to make sure even under such an aggressive environment, we do NOT remove
// nodes from the peers table
GOSSIPER_QUARANTINE_DELAY.setInt(0);
}
/**
* Attempt to do a host replacement on a down host
*/
@Test
public void replaceDownedHost() throws IOException
{
// start with 2 nodes, stop both nodes, start the seed, host replace the down node)
TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
try (Cluster cluster = Cluster.build(2)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
.withTokenSupplier(node -> even.token(node == 3 ? 2 : node))
.start())
{
IInvokableInstance seed = cluster.get(1);
IInvokableInstance nodeToRemove = cluster.get(2);
setupCluster(cluster);
// collect rows to detect issues later on if the state doesn't match
SimpleQueryResult expectedState = nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
stopUnchecked(nodeToRemove);
// now create a new node to replace the other node
IInvokableInstance replacingNode = replaceHostAndStart(cluster, nodeToRemove, props -> {
// since we have a downed host there might be a schema version which is old show up but
// can't be fetched since the host is down...
props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
});
// wait till the replacing node is in the ring
awaitRingJoin(seed, replacingNode);
awaitRingJoin(replacingNode, seed);
// make sure all nodes are healthy
awaitRingHealthy(seed);
assertRingIs(seed, seed, replacingNode);
logger.info("Current ring is {}", assertRingIs(replacingNode, seed, replacingNode));
validateRows(seed.coordinator(), expectedState);
validateRows(replacingNode.coordinator(), expectedState);
}
}
/**
* Attempt to do a host replacement on a alive host
*/
@Test
public void replaceAliveHost() throws IOException
{
// start with 2 nodes, stop both nodes, start the seed, host replace the down node)
TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
try (Cluster cluster = Cluster.build(2)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)
.set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
.withTokenSupplier(node -> even.token(node == 3 ? 2 : node))
.start())
{
IInvokableInstance seed = cluster.get(1);
IInvokableInstance nodeToRemove = cluster.get(2);
setupCluster(cluster);
// collect rows to detect issues later on if the state doesn't match
SimpleQueryResult expectedState = nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
// now create a new node to replace the other node
Assertions.assertThatThrownBy(() -> replaceHostAndStart(cluster, nodeToRemove))
.as("Startup of instance should have failed as you can not replace a alive node")
.hasMessageContaining("Cannot replace a live node")
.isInstanceOf(UnsupportedOperationException.class);
// make sure all nodes are healthy
awaitRingHealthy(seed);
assertRingIs(seed, seed, nodeToRemove);
logger.info("Current ring is {}", assertRingIs(nodeToRemove, seed, nodeToRemove));
validateRows(seed.coordinator(), expectedState);
validateRows(nodeToRemove.coordinator(), expectedState);
}
}
/**
* If the seed goes down, then another node, once the seed comes back, make sure host replacements still work.
*/
@Test
public void seedGoesDownBeforeDownHost() throws IOException
{
// start with 3 nodes, stop both nodes, start the seed, host replace the down node)
TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
try (Cluster cluster = Cluster.build(3)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
.withTokenSupplier(node -> even.token(node == 4 ? 2 : node))
.start())
{
// call early as this can't be touched on a down node
IInvokableInstance seed = cluster.get(1);
IInvokableInstance nodeToRemove = cluster.get(2);
IInvokableInstance nodeToStayAlive = cluster.get(3);
setupCluster(cluster);
// collect rows/tokens to detect issues later on if the state doesn't match
SimpleQueryResult expectedState = nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
List<String> beforeCrashTokens = getTokenMetadataTokens(seed);
// shutdown the seed, then the node to remove
stopUnchecked(seed);
stopUnchecked(nodeToRemove);
// restart the seed
seed.startup();
// make sure the node to remove is still in the ring
assertInRing(seed, nodeToRemove);
// make sure node1 still has node2's tokens
List<String> currentTokens = getTokenMetadataTokens(seed);
Assertions.assertThat(currentTokens)
.as("Tokens no longer match after restarting")
.isEqualTo(beforeCrashTokens);
// now create a new node to replace the other node
IInvokableInstance replacingNode = replaceHostAndStart(cluster, nodeToRemove);
List<IInvokableInstance> expectedRing = Arrays.asList(seed, replacingNode, nodeToStayAlive);
// wait till the replacing node is in the ring
awaitRingJoin(seed, replacingNode);
awaitRingJoin(replacingNode, seed);
awaitRingJoin(nodeToStayAlive, replacingNode);
// make sure all nodes are healthy
logger.info("Current ring is {}", awaitRingHealthy(seed));
expectedRing.forEach(i -> assertRingIs(i, expectedRing));
validateRows(seed.coordinator(), expectedState);
validateRows(replacingNode.coordinator(), expectedState);
}
}
static void setupCluster(Cluster cluster)
{
fixDistributedSchemas(cluster);
init(cluster);
populate(cluster);
cluster.forEach(i -> i.flush(KEYSPACE));
}
static void populate(Cluster cluster)
{
cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int PRIMARY KEY)");
for (int i = 0; i < 10; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)",
ConsistencyLevel.ALL,
i);
}
}
static void validateRows(ICoordinator coordinator, SimpleQueryResult expected)
{
expected.reset();
SimpleQueryResult rows = coordinator.executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
AssertUtils.assertRows(rows, expected);
}
}