blob: d6e715a009cf3f3633a6fb76f8bf7e1e37fb0d2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.ring;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import static java.util.Arrays.asList;
import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
public class BootstrapTest extends TestBaseImpl
{
@Test
public void bootstrapTest() throws Throwable
{
int originalNodeCount = 2;
int expandedNodeCount = originalNodeCount + 1;
try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
populate(cluster,0, 100);
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance newInstance = cluster.bootstrap(config);
withProperty("cassandra.join_ring", false,
() -> newInstance.startup(cluster));
cluster.forEach(statusToBootstrap(newInstance));
cluster.run(asList(pullSchemaFrom(cluster.get(1)),
bootstrap()),
newInstance.config().num());
for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
Assert.assertEquals("Node " + e.getKey() + " has incorrect row state",
100L,
e.getValue().longValue());
}
}
@Test
public void readWriteDuringBootstrapTest() throws Throwable
{
int originalNodeCount = 2;
int expandedNodeCount = originalNodeCount + 1;
try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance newInstance = cluster.bootstrap(config);
withProperty("cassandra.join_ring", false,
() -> newInstance.startup(cluster));
cluster.forEach(statusToBootstrap(newInstance));
populate(cluster,0, 100);
Assert.assertEquals(100, newInstance.executeInternal("SELECT *FROM " + KEYSPACE + ".tbl").length);
}
}
public static void populate(ICluster cluster, int from, int to)
{
populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
}
public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl)
{
cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");
cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
for (int i = from; i < to; i++)
{
cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)",
cl,
i, i, i);
}
}
public static Map<Integer, Long> count(ICluster cluster)
{
return IntStream.rangeClosed(1, cluster.size())
.boxed()
.collect(Collectors.toMap(nodeId -> nodeId,
nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
}
}