blob: add6fdf500d657fae048e05827a484789359dc0f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.distributed.test;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.action.GossipHelper;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.StorageService;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.distributed.action.GossipHelper.decommission;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.TWO;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
* Tests around removing and adding nodes from and to a cluster while hints are still outstanding.
public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl
public void shouldAvoidHintTransferOnDecommission() throws Exception
try (Cluster cluster = init(builder().withNodes(3)
.withConfig(config -> config.set("transfer_hints_on_decommission", false).with(GOSSIP))
cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_no_hints_test (key int PRIMARY KEY, value int)"));
cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), ALL, 0, 0);
long hintsBeforeShutdown = countTotalHints(cluster.get(1));
long hintsDelivered = countHintsDelivered(cluster.get(1));
// Shutdown node 3 so hints can be written against it.
cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), TWO, 0, 0);
Awaitility.await().until(() -> countTotalHints(cluster.get(1)) > 0);
long hintsAfterShutdown = countTotalHints(cluster.get(1));
cluster.get(1).nodetoolResult("decommission", "--force").asserts().success();
long hintsDeliveredByDecom = countHintsDelivered(cluster.get(1));
String mode = cluster.get(1).callOnInstance(() -> StorageService.instance.getOperationMode());
assertEquals(StorageService.Mode.DECOMMISSIONED.toString(), mode);
* Replaces Python dtest {@code}.
public void shouldStreamHintsDuringDecommission() throws Exception
try (Cluster cluster = builder().withNodes(4)
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}"));
cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_hint_test (key int PRIMARY KEY, value int)"));
// Write data using the second node as the coordinator...
populate(cluster, "decom_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
// ...and verify that we've accumulated hints intended for node 4, which is down.
Awaitility.await().until(() -> countTotalHints(cluster.get(2)) > 0);
long totalHints = countTotalHints(cluster.get(2));
// Decomision node 1...
assertEquals(4, endpointsKnownTo(cluster, 2));, 1);
await().pollDelay(1, SECONDS).until(() -> endpointsKnownTo(cluster, 2) == 3);
// ...and verify that all data still exists on either node 2 or 3.
verify(cluster, "decom_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
// Start node 4 back up and verify that all hints were delivered.
await().atMost(30, SECONDS).pollDelay(3, SECONDS).until(() -> count(cluster, "decom_hint_test", 4) >= totalHints);
// Now decommission both nodes 2 and 3..., 2);, 3);
await().pollDelay(1, SECONDS).until(() -> endpointsKnownTo(cluster, 4) == 1);
// ...and verify that even if we drop below the replication factor of 2, all data has been preserved.
verify(cluster, "decom_hint_test", 4, 0, 128, ConsistencyLevel.ONE);
public void shouldBootstrapWithHintsOutstanding() throws Exception
try (Cluster cluster = builder().withNodes(3)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4, 1))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}"));
cluster.schemaChange(withKeyspace("CREATE TABLE %s.boot_hint_test (key int PRIMARY KEY, value int)"));
// Write data using the second node as the coordinator...
populate(cluster, "boot_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
// ...and verify that we've accumulated hints intended for node 3, which is down.
Awaitility.await().until(() -> countTotalHints(cluster.get(2)) > 0);
long totalHints = countTotalHints(cluster.get(2));
// Bootstrap a new/4th node into the cluster...
// ...and verify that all data is available.
verify(cluster, "boot_hint_test", 4, 0, 128, ConsistencyLevel.ONE);
// Finally, bring node 3 back up and verify that all hints were delivered.
await().atMost(30, SECONDS).pollDelay(3, SECONDS).until(() -> count(cluster, "boot_hint_test", 3) >= totalHints);
verify(cluster, "boot_hint_test", 3, 0, 128, ConsistencyLevel.ONE);
verify(cluster, "boot_hint_test", 3, 0, 128, ConsistencyLevel.TWO);
private long countTotalHints(IInvokableInstance instance)
return instance.callOnInstance(() -> StorageMetrics.totalHints.getCount());
private long countHintsDelivered(IInvokableInstance instance)
return instance.callOnInstance(() -> HintsServiceMetrics.hintsSucceeded.getCount());
private void populate(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl)
for (int i = start; i < start + count; i++)
.execute("INSERT INTO " + KEYSPACE + '.' + table + " (key, value) VALUES (?, ?)", cl, i, i);
private void verify(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl)
for (int i = start; i < start + count; i++)
Object[][] row = cluster.coordinator(coordinator)
.execute("SELECT key, value FROM " + KEYSPACE + '.' + table + " WHERE key = ?", cl, i);
assertRows(row, row(i, i));
private long count(Cluster cluster, String table, int node)
return (Long) cluster.get(node).executeInternal("SELECT COUNT(*) FROM " + KEYSPACE + '.' + table)[0][0];
private int endpointsKnownTo(Cluster cluster, int node)
return cluster.get(node).callOnInstance(() -> StorageService.instance.getTokenMetadata().getAllEndpoints().size());