blob: b7cef5f2bfbb8a76983dfdd9094607c3c8376301 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.distributed.shared.AssertUtils.*;
// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
public class SimpleReadWriteTest extends SharedClusterTestBase
{
@Test
public void coordinatorReadTest() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
ConsistencyLevel.ALL,
1),
row(1, 1, 1),
row(1, 2, 2),
row(1, 3, 3));
}
@Test
public void largeMessageTest() throws Throwable
{
int largeMessageThreshold = 1024 * 64;
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
StringBuilder builder = new StringBuilder();
for (int i = 0; i < largeMessageThreshold; i++)
builder.append('a');
String s = builder.toString();
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
ConsistencyLevel.ALL,
s);
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
ConsistencyLevel.ALL,
1),
row(1, 1, s));
}
@Test
public void coordinatorWriteTest() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
ConsistencyLevel.QUORUM);
for (int i = 0; i < 3; i++)
{
assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
row(1, 1, 1));
}
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
ConsistencyLevel.QUORUM),
row(1, 1, 1));
}
@Test
public void readRepairTest() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
ConsistencyLevel.ALL), // ensure node3 in preflist
row(1, 1, 1));
// Verify that data got repaired to the third node
assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
row(1, 1, 1));
}
/**
* If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
*/
@Test
public void writeWithSchemaDisagreement() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
// Introduce schema disagreement
cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
Exception thrown = null;
try
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
ConsistencyLevel.ALL);
}
catch (RuntimeException e)
{
thrown = e;
}
Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2"));
}
/**
* If a node isn't aware of a column, but receives a mutation without that column, the write should succeed
*/
@Test
public void writeWithInconsequentialSchemaDisagreement() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
// Introduce schema disagreement
cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
// this write shouldn't cause any problems because it doesn't write to the new column
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
ConsistencyLevel.ALL);
}
@Test
public void simplePagedReadsTest() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
int size = 100;
Object[][] results = new Object[size][];
for (int i = 0; i < size; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
ConsistencyLevel.QUORUM,
i, i);
results[i] = new Object[]{ 1, i, i };
}
// Make sure paged read returns same results with different page sizes
for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
{
assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
ConsistencyLevel.QUORUM,
pageSize),
results);
}
}
@Test
public void pagingWithRepairTest() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
int size = 100;
Object[][] results = new Object[size][];
for (int i = 0; i < size; i++)
{
// Make sure that data lands on different nodes and not coordinator
cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
i, i);
results[i] = new Object[]{ 1, i, i };
}
// Make sure paged read returns same results with different page sizes
for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
{
assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
ConsistencyLevel.ALL,
pageSize),
results);
}
assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
results);
}
@Test
public void pagingTests() throws Throwable
{
try (ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
for (int i = 0; i < 10; i++)
{
for (int j = 0; j < 10; j++)
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
ConsistencyLevel.QUORUM,
i, j, i + i);
singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
ConsistencyLevel.QUORUM,
i, j, i + i);
}
}
int[] pageSizes = new int[]{ 1, 2, 3, 5, 10, 20, 50 };
String[] statements = new String[]{ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
"SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3",
"SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
"SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
};
for (String statement : statements)
{
for (int pageSize : pageSizes)
{
assertRows(cluster.coordinator(1)
.executeWithPaging(statement,
ConsistencyLevel.QUORUM, pageSize),
singleNode.coordinator(1)
.executeWithPaging(statement,
ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
}
}
}
}
@Test
public void metricsCountQueriesTest() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
for (int i = 0; i < 100; i++)
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
long readCount1 = readCount((IInvokableInstance) cluster.get(1));
long readCount2 = readCount((IInvokableInstance) cluster.get(2));
for (int i = 0; i < 100; i++)
cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
readCount1 = readCount((IInvokableInstance) cluster.get(1)) - readCount1;
readCount2 = readCount((IInvokableInstance) cluster.get(2)) - readCount2;
assertEquals(readCount1, readCount2);
assertEquals(100, readCount1);
}
@Test
public void skippedSSTableWithPartitionDeletionTest() throws Throwable
{
try (Cluster cluster = init(Cluster.create(2)))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
// insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
// and a row from a different partition, to provide the sstable's min/max clustering
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2");
cluster.get(1).flush(KEYSPACE);
// expect a single sstable, where minTimestamp equals the timestamp of the partition delete
cluster.get(1).runOnInstance(() -> {
Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
.getColumnFamilyStore("tbl")
.getLiveSSTables();
assertEquals(1, sstables.size());
assertEquals(1, sstables.iterator().next().getMinTimestamp());
});
// on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
Object[][] rows = cluster.coordinator(1)
.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
ConsistencyLevel.ALL);
assertEquals(0, rows.length);
}
}
@Test
public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
{
try (Cluster cluster = init(Cluster.create(2)))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
// insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
// and a row from a different partition, to provide the sstable's min/max clustering
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1");
cluster.get(1).flush(KEYSPACE);
// sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
// insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly,
// this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first
// sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
// know what data and/or tombstones are present on other nodes
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
cluster.get(1).flush(KEYSPACE);
// on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
Object[][] rows = cluster.coordinator(1)
.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
ConsistencyLevel.ALL);
// we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
// node 1 (0, 6, 6) was not.
assertRows(rows, new Object[] {0, 6 ,6});
}
}
@Test
public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable
{
// don't not add skipped sstables back just because the partition delete ts is < the local min ts
try (Cluster cluster = init(Cluster.create(2)))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
// insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
// and a row from a different partition, to provide the sstable's min/max clustering
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3");
cluster.get(1).flush(KEYSPACE);
// sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
// insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable
// has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the
// merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would cause the
// first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
// know what data and/or tombstones are present on other nodes
cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
cluster.get(1).flush(KEYSPACE);
// on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
Object[][] rows = cluster.coordinator(1)
.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
ConsistencyLevel.ALL);
// we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
// node 1 (0, 6, 6) was not.
assertRows(rows, new Object[] {0, 6 ,6});
}
}
private long readCount(IInvokableInstance instance)
{
return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
}
private static Object[][] rows(Object[]...rows)
{
Object[][] r = new Object[rows.length][];
System.arraycopy(rows, 0, r, 0, rows.length);
return r;
}
}