blob: c68320e09403453fffb9d4c3f1abc7041535c9c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.reads.repair.BlockingReadRepair;
import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP;
import static org.apache.cassandra.net.Verb.READ_REQ;
import static org.junit.Assert.fail;
public class ReadRepairTest extends TestBaseImpl
{
/**
* Tests basic behaviour of read repair with {@code BLOCKING} read repair strategy.
*/
@Test
public void testBlockingReadRepair() throws Throwable
{
testReadRepair(ReadRepairStrategy.BLOCKING);
}
/**
*
* Tests basic behaviour of read repair with {@code NONE} read repair strategy.
*/
@Test
public void testNoneReadRepair() throws Throwable
{
testReadRepair(ReadRepairStrategy.NONE);
}
private void testReadRepair(ReadRepairStrategy strategy) throws Throwable
{
try (Cluster cluster = init(Cluster.create(3)))
{
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int, c int, v int, PRIMARY KEY (k, c)) " +
String.format("WITH read_repair='%s'", strategy)));
Object[] row = row(1, 1, 1);
String insertQuery = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
String selectQuery = withKeyspace("SELECT * FROM %s.t WHERE k=1");
// insert data in two nodes, simulating a quorum write that has missed one node
cluster.get(1).executeInternal(insertQuery, row);
cluster.get(2).executeInternal(insertQuery, row);
// verify that the third node doesn't have the row
assertRows(cluster.get(3).executeInternal(selectQuery));
// read with CL=QUORUM to trigger read repair
assertRows(cluster.coordinator(3).execute(selectQuery, QUORUM), row);
// verify whether the coordinator has the repaired row depending on the read repair strategy
if (strategy == ReadRepairStrategy.NONE)
assertRows(cluster.get(3).executeInternal(selectQuery));
else
assertRows(cluster.get(3).executeInternal(selectQuery), row);
}
}
@Test
public void readRepairTimeoutTest() throws Throwable
{
final long reducedReadTimeout = 3000L;
try (Cluster cluster = init(builder().withNodes(3).start()))
{
cluster.forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout)));
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
cluster.verbs(READ_REPAIR_RSP).to(1).drop();
final long start = System.currentTimeMillis();
try
{
cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
fail("Read timeout expected but it did not occur");
}
catch (Exception ex)
{
// the containing exception class was loaded by another class loader. Comparing the message as a workaround to assert the exception
Assert.assertTrue(ex.getClass().toString().contains("ReadTimeoutException"));
long actualTimeTaken = System.currentTimeMillis() - start;
long magicDelayAmount = 100L; // it might not be the best way to check if the time taken is around the timeout value.
// Due to the delays, the actual time taken from client perspective is slighly more than the timeout value
Assert.assertTrue(actualTimeTaken > reducedReadTimeout);
// But it should not exceed too much
Assert.assertTrue(actualTimeTaken < reducedReadTimeout + magicDelayAmount);
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
row(1, 1, 1)); // the partition happened when the repaired node sending back ack. The mutation should be in fact applied.
}
}
}
@Test
public void failingReadRepairTest() throws Throwable
{
try (Cluster cluster = init(builder().withNodes(3).start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
for (int i = 1 ; i <= 2 ; ++i)
cluster.get(i).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
cluster.filters().verbs(READ_REPAIR_REQ.id).to(3).drop();
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
ConsistencyLevel.QUORUM),
row(1, 1, 1));
// Data was not repaired
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
}
}
@Test
public void movingTokenReadRepairTest() throws Throwable
{
try (Cluster cluster = init(Cluster.create(4), 3))
{
List<Token> tokens = cluster.tokens();
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
int i = 0;
while (true)
{
Token t = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(i));
// the list of tokens uses zero-based numbering, whereas the cluster nodes use one-based numbering
if (t.compareTo(tokens.get(2 - 1)) < 0 && t.compareTo(tokens.get(1 - 1)) > 0)
break;
++i;
}
// write only to #4
cluster.get(4).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1)", i);
// mark #2 as leaving in #4
cluster.get(4).acceptsOnInstance((InetSocketAddress endpoint) -> {
StorageService.instance.getTokenMetadata().addLeavingEndpoint(InetAddressAndPort.getByAddressOverrideDefaults(endpoint.getAddress(), endpoint.getPort()));
PendingRangeCalculatorService.instance.update();
PendingRangeCalculatorService.instance.blockUntilFinished();
}).accept(cluster.get(2).broadcastAddress());
// prevent #4 from reading or writing to #3, so our QUORUM must contain #2 and #4
// since #1 is taking over the range, this means any read-repair must make it to #1 as well
// (as a speculative repair in this case, as we prefer to send repair mutations to the initial
// set of read replicas, which are 2 and 3 here).
cluster.filters().verbs(READ_REQ.id).from(4).to(3).drop();
cluster.filters().verbs(READ_REPAIR_REQ.id).from(4).to(3).drop();
assertRows(cluster.coordinator(4).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
ConsistencyLevel.QUORUM, i),
row(i, 1, 1));
// verify that #1 receives the write
assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", i),
row(i, 1, 1));
}
}
/**
* Test that there is no read repair with RF=1, and that altering it to RF>1 doesn't trigger any repair by
* itself but following queries will use read repair accordingly with the new RF.
*/
@Test
public void alterRFAndRunReadRepair() throws Throwable
{
try (Cluster cluster = builder().withNodes(2).start())
{
cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " +
"{'class': 'SimpleStrategy', 'replication_factor': 1}"));
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int PRIMARY KEY, a int, b int)" +
" WITH read_repair='blocking'"));
// insert a row that will only get to one node due to the RF=1
Object[] row = row(1, 1, 1);
cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.t (k, a, b) VALUES (?, ?, ?)"), row);
// flush to ensure reads come from sstables
cluster.get(1).flush(KEYSPACE);
// at RF=1 it shouldn't matter which node we query, as the data should always come from the only replica
String query = withKeyspace("SELECT * FROM %s.t WHERE k = 1");
for (int i = 1; i <= cluster.size(); i++)
assertRows(cluster.coordinator(i).execute(query, ALL), row);
// at RF=1 the prevoius queries shouldn't have triggered read repair
assertRows(cluster.get(1).executeInternal(query), row);
assertRows(cluster.get(2).executeInternal(query));
// alter RF
System.setProperty(Config.PROPERTY_PREFIX + "allow_alter_rf_during_range_movement", "true");
cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH replication = " +
"{'class': 'SimpleStrategy', 'replication_factor': 2}"));
// altering the RF shouldn't have triggered any read repair
assertRows(cluster.get(1).executeInternal(query), row);
assertRows(cluster.get(2).executeInternal(query));
// query again at CL=ALL, this time the data should be repaired
assertRows(cluster.coordinator(2).execute(query, ALL), row);
assertRows(cluster.get(1).executeInternal(query), row);
assertRows(cluster.get(2).executeInternal(query), row);
}
}
@Test
public void testRangeSliceQueryWithTombstonesInMemory() throws Throwable
{
testRangeSliceQueryWithTombstones(false);
}
@Test
public void testRangeSliceQueryWithTombstonesOnDisk() throws Throwable
{
testRangeSliceQueryWithTombstones(true);
}
/**
* Verify that range queries with CL>ONE don't do unnecessary read-repairs when there are tombstones.
* <p>
* See CASSANDRA-8989 and CASSANDRA-9502.
* <p>
* Migrated from Python dtest read_repair_test.py:TestReadRepair.test_range_slice_query_with_tombstones()
*/
private void testRangeSliceQueryWithTombstones(boolean flush) throws Throwable
{
try (Cluster cluster = init(Cluster.create(2)))
{
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int, c int, v int, PRIMARY KEY(k, c))"));
ICoordinator coordinator = cluster.coordinator(1);
// insert some rows in all nodes
String insertQuery = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
for (int k = 0; k < 10; k++)
{
for (int c = 0; c < 10; c++)
coordinator.execute(insertQuery, ALL, k, c, k * c);
}
// delete a subset of the inserted partitions, plus some others that don't exist
String deletePartitionQuery = withKeyspace("DELETE FROM %s.t WHERE k = ?");
for (int k = 5; k < 15; k++)
{
coordinator.execute(deletePartitionQuery, ALL, k);
}
// delete some of the rows of some of the partitions, including deleted and not deleted partitions
String deleteRowQuery = withKeyspace("DELETE FROM %s.t WHERE k = ? AND c = ?");
for (int k = 2; k < 7; k++)
{
for (int c = 0; c < 5; c++)
coordinator.execute(deleteRowQuery, ALL, k, c);
}
// delete some of the rows of some not-existent partitions, including deleted and never-written partitions
for (int k = 12; k < 17; k++)
{
for (int c = 0; c < 5; c++)
coordinator.execute(deleteRowQuery, ALL, k, c);
}
// flush all the nodes if specified
if (flush)
{
for (int n = 1; n <= cluster.size(); n++)
cluster.get(n).flush(KEYSPACE);
}
// run a bunch of queries verifying that they don't trigger read repair
coordinator.execute(withKeyspace("SELECT * FROM %s.t LIMIT 100"), QUORUM);
for (int k = 0; k < 15; k++)
{
coordinator.execute(withKeyspace("SELECT * FROM %s.t WHERE k=?"), QUORUM, k);
for (int c = 0; c < 10; c++)
{
coordinator.execute(withKeyspace("SELECT * FROM %s.t WHERE k=? AND c=?"), QUORUM, k, c);
coordinator.execute(withKeyspace("SELECT * FROM %s.t WHERE k=? AND c>?"), QUORUM, k, c);
coordinator.execute(withKeyspace("SELECT * FROM %s.t WHERE k=? AND c<?"), QUORUM, k, c);
}
}
long requests = ReadRepairTester.readRepairRequestsCount(cluster.get(1), "t");
assertEquals("No read repair requests were expected, found " + requests, 0, requests);
}
}
@Test
public void readRepairRTRangeMovementTest() throws Throwable
{
ExecutorService es = Executors.newFixedThreadPool(1);
String key = "test1";
try (Cluster cluster = init(Cluster.build()
.withConfig(config -> config.with(Feature.GOSSIP, Feature.NETWORK)
.set("read_request_timeout_in_ms", Integer.MAX_VALUE))
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
.withNodes(3)
.start()))
{
cluster.schemaChange("CREATE TABLE distributed_test_keyspace.tbl (\n" +
" key text,\n" +
" column1 int,\n" +
" PRIMARY KEY (key, column1)\n" +
") WITH CLUSTERING ORDER BY (column1 ASC)");
cluster.forEach(i -> i.runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction()));
for (int i = 1; i <= 2; i++)
{
cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 50 WHERE key=?;", key);
cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 80 WHERE key=? and column1 >= ? and column1 < ?;", key, 10, 100);
cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 70 WHERE key=? and column1 = ?;", key, 30);
cluster.get(i).flush(KEYSPACE);
}
cluster.get(3).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 100 WHERE key=?;", key);
cluster.get(3).flush(KEYSPACE);
// pause the read until we have bootstrapped a new node below
SimpleCondition continueRead = new SimpleCondition();
SimpleCondition readStarted = new SimpleCondition();
cluster.filters().outbound().from(3).to(1,2).verbs(Verb.READ_REQ.id).messagesMatching((i, i1, iMessage) -> {
try
{
readStarted.signalAll();
continueRead.await();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
return false;
}).drop();
Future<Object[][]> read = es.submit(() -> cluster.coordinator(3)
.execute("SELECT * FROM distributed_test_keyspace.tbl WHERE key=? and column1 >= ? and column1 <= ?",
ConsistencyLevel.ALL, key, 20, 40));
readStarted.await();
IInstanceConfig config = cluster.newInstanceConfig();
config.set("auto_bootstrap", true);
cluster.bootstrap(config).startup();
continueRead.signalAll();
read.get();
}
finally
{
es.shutdown();
}
}
/**
* Range queries before CASSANDRA-11427 will trigger read repairs for puregable tombstones on hosts that already
* compacted given tombstones. This will result in constant transfer and compaction actions sourced by few nodes
* seeding purgeable tombstones and triggered e.g. by periodical jobs scanning data range wise.
* <p>
* See CASSANDRA-11427.
* <p>
* Migrated from Python dtest read_repair_test.py:TestReadRepair.test_gcable_tombstone_resurrection_on_range_slice_query()
*/
@Test
public void testGCableTombstoneResurrectionOnRangeSliceQuery() throws Throwable
{
try (Cluster cluster = init(Cluster.create(2)))
{
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int, c int, PRIMARY KEY(k, c)) " +
"WITH gc_grace_seconds=0 AND compaction = " +
"{'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'}"));
ICoordinator coordinator = cluster.coordinator(1);
// insert some data
coordinator.execute(withKeyspace("INSERT INTO %s.t(k, c) VALUES (0, 0)"), ALL);
coordinator.execute(withKeyspace("INSERT INTO %s.t(k, c) VALUES (1, 1)"), ALL);
// create partition tombstones in all nodes for both existent and not existent partitions
coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k=0"), ALL); // exists
coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k=2"), ALL); // doesn't exist
// create row tombstones in all nodes for both existent and not existent rows
coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k=1 AND c=1"), ALL); // exists
coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k=3 AND c=1"), ALL); // doesn't exist
// flush single sstable with tombstones
cluster.get(1).flush(KEYSPACE);
cluster.get(2).flush(KEYSPACE);
// purge tombstones from node2 with compaction (gc_grace_seconds=0)
cluster.get(2).forceCompact(KEYSPACE, "t");
// run an unrestricted range query verifying that it doesn't trigger read repair
coordinator.execute(withKeyspace("SELECT * FROM %s.t"), ALL);
long requests = ReadRepairTester.readRepairRequestsCount(cluster.get(1), "t");
assertEquals("No read repair requests were expected, found " + requests, 0, requests);
}
}
@Test
public void partitionDeletionRTTimestampTieTest() throws Throwable
{
try (Cluster cluster = init(builder()
.withNodes(3)
.withInstanceInitializer(RRHelper::install)
.start()))
{
cluster.schemaChange(withKeyspace("CREATE TABLE distributed_test_keyspace.tbl0 (pk bigint,ck bigint,value bigint, PRIMARY KEY (pk, ck)) WITH CLUSTERING ORDER BY (ck ASC) AND read_repair='blocking';"));
long pk = 0L;
cluster.coordinator(1).execute("INSERT INTO distributed_test_keyspace.tbl0 (pk, ck, value) VALUES (?,?,?) USING TIMESTAMP 1", ConsistencyLevel.ALL, pk, 1L, 1L);
cluster.coordinator(1).execute("DELETE FROM distributed_test_keyspace.tbl0 USING TIMESTAMP 2 WHERE pk=? AND ck>?;", ConsistencyLevel.ALL, pk, 2L);
cluster.get(3).executeInternal("DELETE FROM distributed_test_keyspace.tbl0 USING TIMESTAMP 2 WHERE pk=?;", pk);
assertRows(cluster.coordinator(1).execute("SELECT * FROM distributed_test_keyspace.tbl0 WHERE pk=? AND ck>=? AND ck<?;",
ConsistencyLevel.ALL, pk, 1L, 3L));
}
}
public static class RRHelper
{
static void install(ClassLoader cl, int nodeNumber)
{
// Only on coordinating node
if (nodeNumber == 1)
{
new ByteBuddy().rebase(BlockingReadRepair.class)
.method(named("repairPartition"))
.intercept(MethodDelegation.to(RRHelper.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
}
// This verifies new behaviour in 4.0 that was introduced in CASSANDRA-15369, but did not work
// on timestamp tie of RT and partition deletion: we should not generate RT bounds in such case,
// since monotonicity is already ensured by the partition deletion, and RT is unnecessary there.
// For details, see CASSANDRA-16453.
public static Object repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan, @SuperCall Callable<Void> r) throws Exception
{
Assert.assertEquals(2, mutations.size());
for (Mutation value : mutations.values())
{
for (PartitionUpdate update : value.getPartitionUpdates())
{
Assert.assertFalse(update.hasRows());
Assert.assertFalse(update.partitionLevelDeletion().isLive());
}
}
return r.call();
}
}
}