| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.distributed.test.ring; |
| |
| import java.lang.management.ManagementFactory; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.config.CassandraRelevantProperties; |
| import org.apache.cassandra.distributed.Cluster; |
| import org.apache.cassandra.distributed.api.ConsistencyLevel; |
| import org.apache.cassandra.distributed.api.ICluster; |
| import org.apache.cassandra.distributed.api.IInstanceConfig; |
| import org.apache.cassandra.distributed.api.IInvokableInstance; |
| import org.apache.cassandra.distributed.api.TokenSupplier; |
| import org.apache.cassandra.distributed.shared.NetworkTopology; |
| import org.apache.cassandra.distributed.test.TestBaseImpl; |
| |
| import static java.util.Arrays.asList; |
| import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap; |
| import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom; |
| import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap; |
| import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; |
| import static org.apache.cassandra.distributed.api.Feature.GOSSIP; |
| import static org.apache.cassandra.distributed.api.Feature.NETWORK; |
| |
| public class BootstrapTest extends TestBaseImpl |
| { |
| private long savedMigrationDelay; |
| |
| @Before |
| public void beforeTest() |
| { |
| // MigrationCoordinator schedules schema pull requests immediatelly when the node is just starting up, otherwise |
| // the first pull request is sent in 60 seconds. Whether we are starting up or not is detected by examining |
| // the node up-time and if it is lower than MIGRATION_DELAY, we consider the server is starting up. |
| // When we are running multiple test cases in the class, where each starts a node but in the same JVM, the |
| // up-time will be more or less relevant only for the first test. In order to enforce the startup-like behaviour |
| // for each test case, the MIGRATION_DELAY time is adjusted accordingly |
| savedMigrationDelay = CassandraRelevantProperties.MIGRATION_DELAY.getLong(); |
| CassandraRelevantProperties.MIGRATION_DELAY.setLong(ManagementFactory.getRuntimeMXBean().getUptime() + savedMigrationDelay); |
| } |
| |
| @After |
| public void afterTest() |
| { |
| CassandraRelevantProperties.MIGRATION_DELAY.setLong(savedMigrationDelay); |
| } |
| |
| @Test |
| public void bootstrapTest() throws Throwable |
| { |
| int originalNodeCount = 2; |
| int expandedNodeCount = originalNodeCount + 1; |
| |
| try (Cluster cluster = builder().withNodes(originalNodeCount) |
| .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) |
| .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) |
| .withConfig(config -> config.with(NETWORK, GOSSIP)) |
| .start()) |
| { |
| populate(cluster, 0, 100); |
| |
| IInstanceConfig config = cluster.newInstanceConfig(); |
| IInvokableInstance newInstance = cluster.bootstrap(config); |
| withProperty("cassandra.join_ring", false, |
| () -> newInstance.startup(cluster)); |
| |
| cluster.forEach(statusToBootstrap(newInstance)); |
| |
| cluster.run(asList(pullSchemaFrom(cluster.get(1)), |
| bootstrap()), |
| newInstance.config().num()); |
| |
| for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) |
| Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", |
| 100L, |
| e.getValue().longValue()); |
| } |
| } |
| |
| @Test |
| public void readWriteDuringBootstrapTest() throws Throwable |
| { |
| int originalNodeCount = 2; |
| int expandedNodeCount = originalNodeCount + 1; |
| |
| try (Cluster cluster = builder().withNodes(originalNodeCount) |
| .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) |
| .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) |
| .withConfig(config -> config.with(NETWORK, GOSSIP)) |
| .start()) |
| { |
| IInstanceConfig config = cluster.newInstanceConfig(); |
| IInvokableInstance newInstance = cluster.bootstrap(config); |
| withProperty("cassandra.join_ring", false, |
| () -> newInstance.startup(cluster)); |
| |
| cluster.forEach(statusToBootstrap(newInstance)); |
| |
| populate(cluster, 0, 100); |
| |
| Assert.assertEquals(100, newInstance.executeInternal("SELECT *FROM " + KEYSPACE + ".tbl").length); |
| } |
| } |
| |
| public static void populate(ICluster cluster, int from, int to) |
| { |
| populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM); |
| } |
| |
| public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl) |
| { |
| cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"); |
| cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); |
| for (int i = from; i < to; i++) |
| { |
| cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)", |
| cl, |
| i, i, i); |
| } |
| } |
| |
| public static Map<Integer, Long> count(ICluster cluster) |
| { |
| return IntStream.rangeClosed(1, cluster.size()) |
| .boxed() |
| .collect(Collectors.toMap(nodeId -> nodeId, |
| nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0])); |
| } |
| } |