blob: 69b074f5a1b4a476eb95ab1ea13a9a813bf871f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import static com.google.common.collect.Iterators.toArray;
import static java.lang.String.format;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
/**
* Tests short read protection, the mechanism that ensures distributed queries at read consistency levels > ONE/LOCAL_ONE
* avoid short reads that might happen when a limit is used and reconciliation accepts less rows than such limit.
*/
@RunWith(Parameterized.class)
public class ShortReadProtectionTest extends TestBaseImpl
{
private static final int NUM_NODES = 3;
private static final int[] PAGE_SIZES = new int[]{ 1, 10 };
private static Cluster cluster;
private Tester tester;
/**
* The consistency level to be used in reads. Internal writes will hit the minimum number of replicas to match this
* consisstency level. With RF=3, this means one witten replica for CL=ALL, and two for CL=QUORUM.
*/
@Parameterized.Parameter
public ConsistencyLevel readConsistencyLevel;
/**
* Whether to flush data after mutations.
*/
@Parameterized.Parameter(1)
public boolean flush;
/**
* Whether paging is used for the distributed queries.
*/
@Parameterized.Parameter(2)
public boolean paging;
@Parameterized.Parameters(name = "{index}: read_cl={0} flush={1} paging={2}")
public static Collection<Object[]> data()
{
List<Object[]> result = new ArrayList<>();
for (ConsistencyLevel readConsistencyLevel : Arrays.asList(ALL, QUORUM))
for (boolean flush : BOOLEANS)
for (boolean paging : BOOLEANS)
result.add(new Object[]{ readConsistencyLevel, flush, paging });
return result;
}
@BeforeClass
public static void setupCluster() throws IOException
{
cluster = init(Cluster.build()
.withNodes(NUM_NODES)
.withConfig(config -> config.set("hinted_handoff_enabled", false))
.start());
}
@AfterClass
public static void teardownCluster()
{
if (cluster != null)
cluster.close();
}
@Before
public void setupTester()
{
tester = new Tester(readConsistencyLevel, flush, paging);
}
@After
public void teardownTester()
{
tester.dropTable();
}
/**
* Tests SRP for tables with no clustering columns and with a deleted row.
* <p>
* See CASSANDRA-13880.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13880()}.
*/
@Test
public void testSkinnyTableWithoutLiveRows()
{
tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
.allNodes("INSERT INTO %s (id) VALUES (0) USING TIMESTAMP 0")
.toNode1("DELETE FROM %s WHERE id = 0")
.assertRows("SELECT DISTINCT id FROM %s WHERE id = 0")
.assertRows("SELECT id FROM %s WHERE id = 0 LIMIT 1");
}
/**
* Tests SRP for tables with no clustering columns and with alternated live and deleted rows.
* <p>
* See CASSANDRA-13747.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13747()}.
*/
@Test
public void testSkinnyTableWithLiveRows()
{
tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
.allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d) USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
.toNode1("DELETE FROM %s WHERE id IN (1, 0, 4, 6, 3)") // delete every other row
.assertRows("SELECT DISTINCT token(id), id FROM %s",
row(token(5), 5), row(token(8), 8), row(token(2), 2), row(token(7), 7), row(token(9), 9));
}
/**
* Tests SRP for tables with no clustering columns and with complementary deleted rows.
* <p>
* See CASSANDRA-13595.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13595()}.
*/
@Test
public void testSkinnyTableWithComplementaryDeletions()
{
tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
.allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d) USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
.toNode1("DELETE FROM %s WHERE id IN (5, 8, 2, 7, 9)") // delete every other row
.toNode2("DELETE FROM %s WHERE id IN (1, 0, 4, 6)") // delete every other row but the last one
.assertRows("SELECT id FROM %s LIMIT 1", row(3))
.assertRows("SELECT DISTINCT id FROM %s LIMIT 1", row(3));
}
/**
* Tests SRP when more than one row is missing.
* <p>
* See CASSANDRA-12872.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_12872()}.
*/
@Test
public void testMultipleMissedRows()
{
tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
.allNodes(0, 4, i -> format("INSERT INTO %%s (pk, ck) VALUES (0, %d) USING TIMESTAMP 0", i))
.toNode1("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2, 3)",
"INSERT INTO %s (pk, ck) VALUES (0, 5)")
.toNode2("INSERT INTO %s (pk, ck) VALUES (0, 4)")
.assertRows("SELECT ck FROM %s WHERE pk = 0 LIMIT 2", row(0), row(4));
}
/**
* Tests SRP with deleted rows at the beginning of the partition and ascending order.
* <p>
* See CASSANDRA-9460.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read()} together with
* {@link #testDescendingOrder()}.
*/
@Test
public void testAscendingOrder()
{
tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
.allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES (0, %d, %d) USING TIMESTAMP 0", i, i * 10))
.toNode1("DELETE FROM %s WHERE k=0 AND c=1")
.toNode2("DELETE FROM %s WHERE k=0 AND c=2")
.toNode3("DELETE FROM %s WHERE k=0 AND c=3")
.assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 1", row(4, 40))
.assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 2", row(4, 40), row(5, 50))
.assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 3", row(4, 40), row(5, 50), row(6, 60))
.assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 4", row(4, 40), row(5, 50), row(6, 60), row(7, 70));
}
/**
* Tests SRP behaviour with deleted rows at the end of the partition and descending order.
* <p>
* See CASSANDRA-9460.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read()} together with
* {@link #testAscendingOrder()}.
*/
@Test
public void testDescendingOrder()
{
tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
.allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES (0, %d, %d) USING TIMESTAMP 0", i, i * 10))
.toNode1("DELETE FROM %s WHERE k=0 AND c=7")
.toNode2("DELETE FROM %s WHERE k=0 AND c=8")
.toNode3("DELETE FROM %s WHERE k=0 AND c=9")
.assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 1", row(6, 60))
.assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 2", row(6, 60), row(5, 50))
.assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 3", row(6, 60), row(5, 50), row(4, 40))
.assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 4", row(6, 60), row(5, 50), row(4, 40), row(3, 30));
}
/**
* Test short reads ultimately leaving no rows alive after a partition deletion.
* <p>
* See CASSANDRA-4000 and CASSANDRA-8933.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read_delete()} and
* {@code consistency_test.py:TestConsistency.test_short_read_quorum_delete()}. Note that the
* {@link #readConsistencyLevel} test parameter ensures that both tests are covered.
*/
@Test
public void testDeletePartition()
{
tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
.allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING TIMESTAMP 0",
"INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE k=0")
.assertRows("SELECT c, v FROM %s WHERE k=0 LIMIT 1");
}
/**
* Test short reads ultimately leaving no rows alive after a partition deletion when there is a static row.
*/
@Test
public void testDeletePartitionWithStatic()
{
tester.createTable("CREATE TABLE %s (k int, c int, v int, s int STATIC, PRIMARY KEY(k, c))")
.allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100) USING TIMESTAMP 0",
"INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE k=0")
.assertRows("SELECT c, v FROM %s WHERE k=0 LIMIT 1");
}
/**
* Test short reads ultimately leaving no rows alive after a clustering deletion.
*/
@Test
public void testDeleteClustering()
{
tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
.allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING TIMESTAMP 0",
"INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE k=0 AND c=1")
.assertRows("SELECT * FROM %s WHERE k=0 LIMIT 1", row(0, 2, 20))
.toNode2("DELETE FROM %s WHERE k=0 AND c=2")
.assertRows("SELECT * FROM %s WHERE k=0 LIMIT 1");
}
/**
* Test short reads ultimately leaving no rows alive after a clustering deletion when there is a static row.
*/
@Test
public void testDeleteClusteringWithStatic()
{
tester.createTable("CREATE TABLE %s (k int, c int, v int, s int STATIC, PRIMARY KEY(k, c))")
.allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100) USING TIMESTAMP 0",
"INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE k=0 AND c=1")
.assertRows("SELECT k, c, v, s FROM %s WHERE k=0 LIMIT 1", row(0, 2, 20, 100))
.toNode2("DELETE FROM %s WHERE k=0 AND c=2")
.assertRows("SELECT k, c, v, s FROM %s WHERE k=0 LIMIT 1", row(0, null, null, 100));
}
/**
* Test GROUP BY with short read protection, particularly when there is a limit and regular row deletions.
* <p>
* See CASSANDRA-15459
*/
@Test
public void testGroupByRegularRow()
{
tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
.toNode1("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP 0",
"DELETE FROM %s WHERE pk=0 AND ck=0",
"INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE pk=1 AND ck=1",
"INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
"DELETE FROM %s WHERE pk=2 AND ck=2")
.assertRows("SELECT * FROM %s LIMIT 1")
.assertRows("SELECT * FROM %s LIMIT 10")
.assertRows("SELECT * FROM %s GROUP BY pk LIMIT 1")
.assertRows("SELECT * FROM %s GROUP BY pk LIMIT 10")
.assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 1")
.assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 10");
}
/**
* Test GROUP BY with short read protection, particularly when there is a limit and static row deletions.
* <p>
* See CASSANDRA-15459
*/
@Test
public void testGroupByStaticRow()
{
tester.createTable("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY KEY (pk, ck))")
.toNode1("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP 0",
"INSERT INTO %s (pk, s) VALUES (0, null)",
"INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP 0")
.toNode2("INSERT INTO %s (pk, s) VALUES (1, null)",
"INSERT INTO %s (pk, s) VALUES (0, 0) USING TIMESTAMP 0",
"INSERT INTO %s (pk, s) VALUES (2, null)")
.assertRows("SELECT * FROM %s LIMIT 1")
.assertRows("SELECT * FROM %s LIMIT 10")
.assertRows("SELECT * FROM %s GROUP BY pk LIMIT 1")
.assertRows("SELECT * FROM %s GROUP BY pk LIMIT 10")
.assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 1")
.assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 10");
}
/**
* See CASSANDRA-13911.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911()}.
*/
@Test
public void testSkipEarlyTermination()
{
tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
.toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0)")
.toNode2("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2)")
.assertRows("SELECT DISTINCT pk FROM %s", row(0));
}
/**
* A regression test to prove that we can no longer rely on {@code !singleResultCounter.isDoneForPartition()} to
* abort single partition SRP early if a per partition limit is set.
* <p>
* See CASSANDRA-13911.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911_rows_srp()}.
*/
@Test
public void testSkipEarlyTerminationRows()
{
tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
.toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (2, 0) USING TIMESTAMP 0",
"DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck = 1")
.toNode2("INSERT INTO %s (pk, ck) VALUES (0, 2) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (0, 3) USING TIMESTAMP 0",
"DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck = 0",
"INSERT INTO %s (pk, ck) VALUES (2, 1) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0")
.assertRows("SELECT pk, ck FROM %s PER PARTITION LIMIT 2 LIMIT 3", row(0, 0), row(0, 1), row(2, 2));
}
/**
* A regression test to prove that we can no longer rely on {@code !singleResultCounter.isDone()} to abort ranged
* partition SRP early if a per partition limit is set.
* <p>
* See CASSANDRA-13911.
* <p>
* Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911_partitions_srp()}.
*/
@Test
public void testSkipEarlyTerminationPartitions()
{
tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
.toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP 0",
"DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck IN (0, 1)")
.toNode2("INSERT INTO %s (pk, ck) VALUES (0, 2) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (0, 3) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (2, 0) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (2, 1) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (4, 0) USING TIMESTAMP 0",
"INSERT INTO %s (pk, ck) VALUES (4, 1) USING TIMESTAMP 0")
.assertRows("SELECT pk, ck FROM %s PER PARTITION LIMIT 2 LIMIT 4",
row(0, 0), row(0, 1), row(4, 0), row(4, 1));
}
private static long token(int key)
{
return (long) Murmur3Partitioner.instance.getToken(ByteBufferUtil.bytes(key)).getTokenValue();
}
private static class Tester
{
private static final AtomicInteger seqNumber = new AtomicInteger();
private final ConsistencyLevel readConsistencyLevel;
private final boolean flush, paging;
private final String qualifiedTableName;
private boolean flushed = false;
private Tester(ConsistencyLevel readConsistencyLevel, boolean flush, boolean paging)
{
this.readConsistencyLevel = readConsistencyLevel;
this.flush = flush;
this.paging = paging;
qualifiedTableName = KEYSPACE + ".t_" + seqNumber.getAndIncrement();
assert readConsistencyLevel == ALL || readConsistencyLevel == QUORUM
: "Only ALL and QUORUM consistency levels are supported";
}
private Tester createTable(String query)
{
cluster.schemaChange(format(query) + " WITH read_repair='NONE'");
return this;
}
private Tester allNodes(int startInclusive, int endExclusive, Function<Integer, String> querySupplier)
{
IntStream.range(startInclusive, endExclusive).mapToObj(querySupplier::apply).forEach(this::allNodes);
return this;
}
private Tester allNodes(String... queries)
{
for (String query : queries)
allNodes(query);
return this;
}
private Tester allNodes(String query)
{
cluster.coordinator(1).execute(format(query), ALL);
return this;
}
/**
* Internally runs the specified write queries in the first node. If the {@link #readConsistencyLevel} is
* QUORUM, then the write will also be internally done in the second replica, to simulate a QUORUM write.
*/
private Tester toNode1(String... queries)
{
return toNode(1, queries);
}
/**
* Internally runs the specified write queries in the second node. If the {@link #readConsistencyLevel} is
* QUORUM, then the write will also be internally done in the third replica, to simulate a QUORUM write.
*/
private Tester toNode2(String... queries)
{
return toNode(2, queries);
}
/**
* Internally runs the specified write queries in the third node. If the {@link #readConsistencyLevel} is
* QUORUM, then the write will also be internally done in the first replica, to simulate a QUORUM write.
*/
private Tester toNode3(String... queries)
{
return toNode(3, queries);
}
/**
* Internally runs the specified write queries in the specified node. If the {@link #readConsistencyLevel} is
* QUORUM the write will also be internally done in the next replica in the ring, to simulate a QUORUM write.
*/
private Tester toNode(int node, String... queries)
{
IInvokableInstance replica = cluster.get(node);
IInvokableInstance nextReplica = readConsistencyLevel == QUORUM
? cluster.get(node == NUM_NODES ? 1 : node + 1)
: null;
for (String query : queries)
{
String formattedQuery = format(query);
replica.executeInternal(formattedQuery);
if (nextReplica != null)
nextReplica.executeInternal(formattedQuery);
}
return this;
}
private Tester assertRows(String query, Object[]... expectedRows)
{
if (flush && !flushed)
{
cluster.stream().forEach(n -> n.flush(KEYSPACE));
flushed = true;
}
String formattedQuery = format(query);
cluster.coordinators().forEach(coordinator -> {
if (paging)
{
for (int fetchSize : PAGE_SIZES)
{
Iterator<Object[]> actualRows = coordinator.executeWithPaging(formattedQuery, readConsistencyLevel, fetchSize);
AssertUtils.assertRows(toArray(actualRows, Object[].class), expectedRows);
}
}
else
{
Object[][] actualRows = coordinator.execute(formattedQuery, readConsistencyLevel);
AssertUtils.assertRows(actualRows, expectedRows);
}
});
return this;
}
private String format(String query)
{
return String.format(query, qualifiedTableName);
}
private void dropTable()
{
cluster.schemaChange(format("DROP TABLE IF EXISTS %s"));
}
}
}