blob: f31e3a8806d5901f19ed0526afe99d3e3a3d89b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.ring;
import java.lang.management.ManagementFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.AssumptionViolatedException;
import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.schema.SchemaConstants;
import static java.util.Arrays.asList;
import static org.apache.cassandra.config.CassandraRelevantProperties.JOIN_RING;
import static org.apache.cassandra.config.CassandraRelevantProperties.MIGRATION_DELAY;
import static org.apache.cassandra.config.CassandraRelevantProperties.RESET_BOOTSTRAP_PROGRESS;
import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
public class BootstrapTest extends TestBaseImpl
{
private long savedMigrationDelay;
static WithProperties properties;
@Before
public void beforeTest()
{
// MigrationCoordinator schedules schema pull requests immediatelly when the node is just starting up, otherwise
// the first pull request is sent in 60 seconds. Whether we are starting up or not is detected by examining
// the node up-time and if it is lower than MIGRATION_DELAY, we consider the server is starting up.
// When we are running multiple test cases in the class, where each starts a node but in the same JVM, the
// up-time will be more or less relevant only for the first test. In order to enforce the startup-like behaviour
// for each test case, the MIGRATION_DELAY time is adjusted accordingly
properties = new WithProperties().set(MIGRATION_DELAY, ManagementFactory.getRuntimeMXBean().getUptime() + savedMigrationDelay);
}
@After
public void afterTest()
{
properties.close();
}
@Test
public void bootstrapWithResumeTest() throws Throwable
{
RESET_BOOTSTRAP_PROGRESS.setBoolean(false);
bootstrapTest();
}
@Test
public void bootstrapWithoutResumeTest() throws Throwable
{
RESET_BOOTSTRAP_PROGRESS.setBoolean(true);
bootstrapTest();
}
/**
* Confirm that a normal, non-resumed bootstrap without the reset_bootstrap_progress param specified works without issue.
* @throws Throwable
*/
@Test
public void bootstrapUnspecifiedResumeTest() throws Throwable
{
RESET_BOOTSTRAP_PROGRESS.clearValue(); // checkstyle: suppress nearby 'clearValueSystemPropertyUsage'
bootstrapTest();
}
/**
* Confirm that, in the absence of the reset_bootstrap_progress param being set and in the face of a found prior
* partial bootstrap, we error out and don't complete our bootstrap.
*
* Test w/out vnodes only; logic is identical for both run env but the token alloc in this test doesn't work for
* vnode env and it's not worth the lift to update it to work in both env.
*
* @throws Throwable
*/
@Test
public void bootstrapUnspecifiedFailsOnResumeTest() throws Throwable
{
RESET_BOOTSTRAP_PROGRESS.clearValue(); // checkstyle: suppress nearby 'clearValueSystemPropertyUsage'
// Need our partitioner active for rangeToBytes conversion below
Config c = DatabaseDescriptor.loadConfig();
DatabaseDescriptor.daemonInitialization(() -> c);
int originalNodeCount = 2;
int expandedNodeCount = originalNodeCount + 1;
boolean sawException = false;
try (Cluster cluster = builder().withNodes(originalNodeCount)
.withoutVNodes()
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
populate(cluster, 0, 100);
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance newInstance = cluster.bootstrap(config);
withProperty(JOIN_RING, false, () -> newInstance.startup(cluster));
cluster.forEach(statusToBootstrap(newInstance));
List<Token> tokens = cluster.tokens();
assert tokens.size() >= 3;
/*
Our local tokens:
Tokens in cluster tokens: [-3074457345618258603, 3074457345618258601, 9223372036854775805]
From the bootstrap process:
fetchReplicas in our test keyspace:
[FetchReplica
{local=Full(/127.0.0.3:7012,(-3074457345618258603,3074457345618258601]),
remote=Full(/127.0.0.1:7012,(-3074457345618258603,3074457345618258601])},
FetchReplica
{local=Full(/127.0.0.3:7012,(9223372036854775805,-3074457345618258603]),
remote=Full(/127.0.0.1:7012,(9223372036854775805,-3074457345618258603])},
FetchReplica
{local=Full(/127.0.0.3:7012,(3074457345618258601,9223372036854775805]),
remote=Full(/127.0.0.1:7012,(3074457345618258601,9223372036854775805])}]
*/
// Insert some bogus ranges in the keyspace to be bootstrapped to trigger the check on available ranges on bootstrap.
// Note: these have to precisely overlap with the token ranges hit during streaming or they won't trigger the
// availability logic on bootstrap to then except out; we can't just have _any_ range for a keyspace, but rather,
// must have a range that overlaps with what we're trying to stream.
Set<Range <Token>> fullSet = new HashSet<>();
fullSet.add(new Range<>(tokens.get(0), tokens.get(1)));
fullSet.add(new Range<>(tokens.get(1), tokens.get(2)));
fullSet.add(new Range<>(tokens.get(2), tokens.get(0)));
// Should be fine to trigger on full ranges only but add a partial for good measure.
Set<Range <Token>> partialSet = new HashSet<>();
partialSet.add(new Range<>(tokens.get(2), tokens.get(1)));
String cql = String.format("INSERT INTO %s.%s (keyspace_name, full_ranges, transient_ranges) VALUES (?, ?, ?)",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.AVAILABLE_RANGES_V2);
newInstance.executeInternal(cql,
KEYSPACE,
fullSet.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
partialSet.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()));
// We expect bootstrap to throw an exception on node3 w/the seen ranges we've inserted
cluster.run(asList(pullSchemaFrom(cluster.get(1)),
bootstrap()),
newInstance.config().num());
}
catch (AssumptionViolatedException ave)
{
// We get an AssumptionViolatedException if we're in a test job configured w/vnodes
throw ave;
}
catch (RuntimeException rte)
{
if (rte.getMessage().contains("Discovered existing bootstrap data"))
sawException = true;
}
Assert.assertTrue("Expected to see a RuntimeException w/'Discovered existing bootstrap data' in the error message; did not.",
sawException);
}
private void bootstrapTest() throws Throwable
{
int originalNodeCount = 2;
int expandedNodeCount = originalNodeCount + 1;
try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
populate(cluster, 0, 100);
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance newInstance = cluster.bootstrap(config);
withProperty(JOIN_RING, false,
() -> newInstance.startup(cluster));
cluster.forEach(statusToBootstrap(newInstance));
cluster.run(asList(pullSchemaFrom(cluster.get(1)),
bootstrap()),
newInstance.config().num());
for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
Assert.assertEquals("Node " + e.getKey() + " has incorrect row state",
100L,
e.getValue().longValue());
}
}
@Test
public void readWriteDuringBootstrapTest() throws Throwable
{
int originalNodeCount = 2;
int expandedNodeCount = originalNodeCount + 1;
try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance newInstance = cluster.bootstrap(config);
withProperty(JOIN_RING, false,
() -> newInstance.startup(cluster));
cluster.forEach(statusToBootstrap(newInstance));
populate(cluster, 0, 100);
Assert.assertEquals(100, newInstance.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length);
}
}
public static void populate(ICluster cluster, int from, int to)
{
populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
}
public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl)
{
cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");
cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
for (int i = from; i < to; i++)
{
cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)",
cl,
i, i, i);
}
}
public static Map<Integer, Long> count(ICluster cluster)
{
return IntStream.rangeClosed(1, cluster.size())
.boxed()
.collect(Collectors.toMap(nodeId -> nodeId,
nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
}
}