blob: 83c2806079a58e948a6ca5f9d5df6a0d673c79ac [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.distributed.test;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Condition;
import org.junit.Assert;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.config.CassandraRelevantProperties.ALLOW_ALTER_RF_DURING_RANGE_MOVEMENT;
import static;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
import static;
import static;
import static;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
import static;
public class ReadRepairTest extends TestBaseImpl
* Tests basic behaviour of read repair with {@code BLOCKING} read repair strategy.
public void testBlockingReadRepair() throws Throwable
* Tests basic behaviour of read repair with {@code NONE} read repair strategy.
public void testNoneReadRepair() throws Throwable
private void testReadRepair(ReadRepairStrategy strategy) throws Throwable
try (Cluster cluster = init(Cluster.create(3)))
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int, c int, v int, PRIMARY KEY (k, c)) " +
String.format("WITH read_repair='%s'", strategy)));
Object[] row = row(1, 1, 1);
String insertQuery = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
String selectQuery = withKeyspace("SELECT * FROM %s.t WHERE k=1");
// insert data in two nodes, simulating a quorum write that has missed one node
cluster.get(1).executeInternal(insertQuery, row);
cluster.get(2).executeInternal(insertQuery, row);
// verify that the third node doesn't have the row
// read with CL=QUORUM to trigger read repair
assertRows(cluster.coordinator(3).execute(selectQuery, QUORUM), row);
// verify whether the coordinator has the repaired row depending on the read repair strategy
if (strategy == ReadRepairStrategy.NONE)
assertRows(cluster.get(3).executeInternal(selectQuery), row);
public void readRepairTimeoutTest() throws Throwable
final long reducedReadTimeout = 3000L;
try (Cluster cluster = init(builder().withNodes(3).start()))
cluster.forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout)));
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
final long start = currentTimeMillis();
cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
fail("Read timeout expected but it did not occur");
catch (Exception ex)
// the containing exception class was loaded by another class loader. Comparing the message as a workaround to assert the exception
long actualTimeTaken = currentTimeMillis() - start;
long magicDelayAmount = 100L; // it might not be the best way to check if the time taken is around the timeout value.
// Due to the delays, the actual time taken from client perspective is slighly more than the timeout value
Assert.assertTrue(actualTimeTaken > reducedReadTimeout);
// But it should not exceed too much
Assert.assertTrue(actualTimeTaken < reducedReadTimeout + magicDelayAmount);
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
row(1, 1, 1)); // the partition happened when the repaired node sending back ack. The mutation should be in fact applied.
public void failingReadRepairTest() throws Throwable
try (Cluster cluster = init(builder().withNodes(3).start()))
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
for (int i = 1 ; i <= 2 ; ++i)
cluster.get(i).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
row(1, 1, 1));
// Data was not repaired
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
public void movingTokenReadRepairTest() throws Throwable
// TODO: fails with vnode enabled
try (Cluster cluster = init(, 3))
List<Token> tokens = cluster.tokens();
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
int i = 0;
while (true)
Token t = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(i));
// the list of tokens uses zero-based numbering, whereas the cluster nodes use one-based numbering
if (t.compareTo(tokens.get(2 - 1)) < 0 && t.compareTo(tokens.get(1 - 1)) > 0)
// write only to #4
cluster.get(4).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1)", i);
// mark #2 as leaving in #4
cluster.get(4).acceptsOnInstance((InetSocketAddress endpoint) -> {
StorageService.instance.getTokenMetadata().addLeavingEndpoint(InetAddressAndPort.getByAddressOverrideDefaults(endpoint.getAddress(), endpoint.getPort()));
// prevent #4 from reading or writing to #3, so our QUORUM must contain #2 and #4
// since #1 is taking over the range, this means any read-repair must make it to #1 as well
// (as a speculative repair in this case, as we prefer to send repair mutations to the initial
// set of read replicas, which are 2 and 3 here).
assertRows(cluster.coordinator(4).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
ConsistencyLevel.QUORUM, i),
row(i, 1, 1));
// verify that #1 receives the write
assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", i),
row(i, 1, 1));
* Test that there is no read repair with RF=1, and that altering it to RF>1 doesn't trigger any repair by
* itself but following queries will use read repair accordingly with the new RF.
public void alterRFAndRunReadRepair() throws Throwable
try (Cluster cluster = builder().withNodes(2).start())
cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " +
"{'class': 'SimpleStrategy', 'replication_factor': 1}"));
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int PRIMARY KEY, a int, b int)" +
" WITH read_repair='blocking'"));
// insert a row that will only get to one node due to the RF=1
Object[] row = row(1, 1, 1);
cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.t (k, a, b) VALUES (?, ?, ?)"), row);
// flush to ensure reads come from sstables
// at RF=1 it shouldn't matter which node we query, as the data should always come from the only replica
String query = withKeyspace("SELECT * FROM %s.t WHERE k = 1");
for (int i = 1; i <= cluster.size(); i++)
assertRows(cluster.coordinator(i).execute(query, ALL), row);
// at RF=1 the prevoius queries shouldn't have triggered read repair
assertRows(cluster.get(1).executeInternal(query), row);
// alter RF
cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH replication = " +
"{'class': 'SimpleStrategy', 'replication_factor': 2}"));
// altering the RF shouldn't have triggered any read repair
assertRows(cluster.get(1).executeInternal(query), row);
// query again at CL=ALL, this time the data should be repaired
assertRows(cluster.coordinator(2).execute(query, ALL), row);
assertRows(cluster.get(1).executeInternal(query), row);
assertRows(cluster.get(2).executeInternal(query), row);
public void testRangeSliceQueryWithTombstonesInMemory() throws Throwable
public void testRangeSliceQueryWithTombstonesOnDisk() throws Throwable
* Verify that range queries with CL>ONE don't do unnecessary read-repairs when there are tombstones.
* <p>
* See CASSANDRA-8989 and CASSANDRA-9502.
* <p>
* Migrated from Python dtest
private void testRangeSliceQueryWithTombstones(boolean flush) throws Throwable
try (Cluster cluster = init(Cluster.create(2)))
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int, c int, v int, PRIMARY KEY(k, c))"));
ICoordinator coordinator = cluster.coordinator(1);
// insert some rows in all nodes
String insertQuery = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
for (int k = 0; k < 10; k++)
for (int c = 0; c < 10; c++)
coordinator.execute(insertQuery, ALL, k, c, k * c);
// delete a subset of the inserted partitions, plus some others that don't exist
String deletePartitionQuery = withKeyspace("DELETE FROM %s.t WHERE k = ?");
for (int k = 5; k < 15; k++)
coordinator.execute(deletePartitionQuery, ALL, k);
// delete some of the rows of some of the partitions, including deleted and not deleted partitions
String deleteRowQuery = withKeyspace("DELETE FROM %s.t WHERE k = ? AND c = ?");
for (int k = 2; k < 7; k++)
for (int c = 0; c < 5; c++)
coordinator.execute(deleteRowQuery, ALL, k, c);
// delete some of the rows of some not-existent partitions, including deleted and never-written partitions
for (int k = 12; k < 17; k++)
for (int c = 0; c < 5; c++)
coordinator.execute(deleteRowQuery, ALL, k, c);
// flush all the nodes if specified
if (flush)
for (int n = 1; n <= cluster.size(); n++)
// run a bunch of queries verifying that they don't trigger read repair
coordinator.execute(withKeyspace("SELECT * FROM %s.t LIMIT 100"), QUORUM);
for (int k = 0; k < 15; k++)
coordinator.execute(withKeyspace("SELECT * FROM %s.t WHERE k=?"), QUORUM, k);
for (int c = 0; c < 10; c++)
coordinator.execute(withKeyspace("SELECT * FROM %s.t WHERE k=? AND c=?"), QUORUM, k, c);
coordinator.execute(withKeyspace("SELECT * FROM %s.t WHERE k=? AND c>?"), QUORUM, k, c);
coordinator.execute(withKeyspace("SELECT * FROM %s.t WHERE k=? AND c<?"), QUORUM, k, c);
long requests = ReadRepairTester.readRepairRequestsCount(cluster.get(1), "t");
assertEquals("No read repair requests were expected, found " + requests, 0, requests);
public void readRepairRTRangeMovementTest() throws Throwable
ExecutorService es = Executors.newFixedThreadPool(1);
String key = "test1";
try (Cluster cluster = init(
.withConfig(config -> config.with(Feature.GOSSIP, Feature.NETWORK)
.set("read_request_timeout", String.format("%dms", Integer.MAX_VALUE)))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
cluster.schemaChange("CREATE TABLE distributed_test_keyspace.tbl (\n" +
" key text,\n" +
" column1 int,\n" +
" PRIMARY KEY (key, column1)\n" +
cluster.forEach(i -> i.runOnInstance(() -> open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction()));
for (int i = 1; i <= 2; i++)
cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 50 WHERE key=?;", key);
cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 80 WHERE key=? and column1 >= ? and column1 < ?;", key, 10, 100);
cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 70 WHERE key=? and column1 = ?;", key, 30);
cluster.get(3).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 100 WHERE key=?;", key);
// pause the read until we have bootstrapped a new node below
Condition continueRead = newOneTimeCondition();
Condition readStarted = newOneTimeCondition();
cluster.filters().outbound().from(3).to(1,2).verbs(, i1, iMessage) -> {
catch (InterruptedException e)
throw new RuntimeException(e);
return false;
Future<Object[][]> read = es.submit(() -> cluster.coordinator(3)
.execute("SELECT * FROM distributed_test_keyspace.tbl WHERE key=? and column1 >= ? and column1 <= ?",
ALL, key, 20, 40));
IInstanceConfig config = cluster.newInstanceConfig();
config.set("auto_bootstrap", true);
* Range queries before CASSANDRA-11427 will trigger read repairs for puregable tombstones on hosts that already
* compacted given tombstones. This will result in constant transfer and compaction actions sourced by few nodes
* seeding purgeable tombstones and triggered e.g. by periodical jobs scanning data range wise.
* <p>
* See CASSANDRA-11427.
* <p>
* Migrated from Python dtest
public void testGCableTombstoneResurrectionOnRangeSliceQuery() throws Throwable
try (Cluster cluster = init(Cluster.create(2)))
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int, c int, PRIMARY KEY(k, c)) " +
"WITH gc_grace_seconds=0 AND compaction = " +
"{'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'}"));
ICoordinator coordinator = cluster.coordinator(1);
// insert some data
coordinator.execute(withKeyspace("INSERT INTO %s.t(k, c) VALUES (0, 0)"), ALL);
coordinator.execute(withKeyspace("INSERT INTO %s.t(k, c) VALUES (1, 1)"), ALL);
// create partition tombstones in all nodes for both existent and not existent partitions
coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k=0"), ALL); // exists
coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k=2"), ALL); // doesn't exist
// create row tombstones in all nodes for both existent and not existent rows
coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k=1 AND c=1"), ALL); // exists
coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k=3 AND c=1"), ALL); // doesn't exist
// flush single sstable with tombstones
// purge tombstones from node2 with compaction (gc_grace_seconds=0)
cluster.get(2).forceCompact(KEYSPACE, "t");
// run an unrestricted range query verifying that it doesn't trigger read repair
coordinator.execute(withKeyspace("SELECT * FROM %s.t"), ALL);
long requests = ReadRepairTester.readRepairRequestsCount(cluster.get(1), "t");
assertEquals("No read repair requests were expected, found " + requests, 0, requests);
public void partitionDeletionRTTimestampTieTest() throws Throwable
try (Cluster cluster = init(builder()
cluster.schemaChange(withKeyspace("CREATE TABLE distributed_test_keyspace.tbl0 (pk bigint,ck bigint,value bigint, PRIMARY KEY (pk, ck)) WITH CLUSTERING ORDER BY (ck ASC) AND read_repair='blocking';"));
long pk = 0L;
cluster.coordinator(1).execute("INSERT INTO distributed_test_keyspace.tbl0 (pk, ck, value) VALUES (?,?,?) USING TIMESTAMP 1", ConsistencyLevel.ALL, pk, 1L, 1L);
cluster.coordinator(1).execute("DELETE FROM distributed_test_keyspace.tbl0 USING TIMESTAMP 2 WHERE pk=? AND ck>?;", ConsistencyLevel.ALL, pk, 2L);
cluster.get(3).executeInternal("DELETE FROM distributed_test_keyspace.tbl0 USING TIMESTAMP 2 WHERE pk=?;", pk);
assertRows(cluster.coordinator(1).execute("SELECT * FROM distributed_test_keyspace.tbl0 WHERE pk=? AND ck>=? AND ck<?;",
ConsistencyLevel.ALL, pk, 1L, 3L));
public static class RRHelper
static void install(ClassLoader cl, int nodeNumber)
// Only on coordinating node
if (nodeNumber == 1)
new ByteBuddy().rebase(BlockingReadRepair.class)
.load(cl, ClassLoadingStrategy.Default.INJECTION);
// This verifies new behaviour in 4.0 that was introduced in CASSANDRA-15369, but did not work
// on timestamp tie of RT and partition deletion: we should not generate RT bounds in such case,
// since monotonicity is already ensured by the partition deletion, and RT is unnecessary there.
// For details, see CASSANDRA-16453.
public static Object repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForWrite writePlan, @SuperCall Callable<Void> r) throws Exception
Assert.assertEquals(2, mutations.size());
for (Mutation value : mutations.values())
for (PartitionUpdate update : value.getPartitionUpdates())