blob: 6b7daef26febaaba62f47f59bbe520a11a78c758 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.guardrails;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import static java.nio.ByteBuffer.allocate;
/**
* Tests the guardrail for the size of collections, {@link Guardrails#collectionSize}.
* <p>
* This test only includes the activation of the guardrail during sstable writes, all other cases are covered by
* {@link org.apache.cassandra.db.guardrails.GuardrailCollectionSizeTest}.
*/
public class GuardrailCollectionSizeOnSSTableWriteTest extends GuardrailTester
{
private static final int NUM_NODES = 2;
private static final int WARN_THRESHOLD = 1024;
private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4;
private static Cluster cluster;
private static com.datastax.driver.core.Cluster driverCluster;
private static Session driverSession;
@BeforeClass
public static void setupCluster() throws IOException
{
cluster = init(Cluster.build(NUM_NODES)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)
.set("collection_size_warn_threshold", WARN_THRESHOLD + "B")
.set("collection_size_fail_threshold", FAIL_THRESHOLD + "B"))
.start());
cluster.disableAutoCompaction(KEYSPACE);
driverCluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
driverSession = driverCluster.connect();
}
@AfterClass
public static void teardownCluster()
{
if (driverSession != null)
driverSession.close();
if (driverCluster != null)
driverCluster.close();
if (cluster != null)
cluster.close();
}
@Override
protected Cluster getCluster()
{
return cluster;
}
@Test
public void testSetSize() throws Throwable
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, null)");
execute("INSERT INTO %s (k, v) VALUES (1, ?)", set());
execute("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
execute("INSERT INTO %s (k, v) VALUES (3, ?)", set(allocate(WARN_THRESHOLD / 2)));
assertNotWarnedOnFlush();
execute("INSERT INTO %s (k, v) VALUES (4, ?)", set(allocate(WARN_THRESHOLD)));
assertWarnedOnFlush(warnMessage("4"));
execute("INSERT INTO %s (k, v) VALUES (5, ?)", set(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD * 3 / 4)));
assertWarnedOnFlush(warnMessage("5"));
execute("INSERT INTO %s (k, v) VALUES (6, ?)", set(allocate(FAIL_THRESHOLD)));
assertFailedOnFlush(failMessage("6"));
execute("INSERT INTO %s (k, v) VALUES (7, ?)", set(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD * 3 / 4)));
assertFailedOnFlush(failMessage("7"));
}
@Test
public void testSetSizeFrozen()
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v frozen<set<blob>>)");
execute("INSERT INTO %s (k, v) VALUES (0, null)");
execute("INSERT INTO %s (k, v) VALUES (1, ?)", set());
execute("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
execute("INSERT INTO %s (k, v) VALUES (4, ?)", set(allocate(WARN_THRESHOLD)));
execute("INSERT INTO %s (k, v) VALUES (5, ?)", set(allocate(FAIL_THRESHOLD)));
// frozen collections size is not checked during sstable write
assertNotWarnedOnFlush();
}
@Test
public void testSetSizeWithUpdates()
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, ?)", set(allocate(1)));
execute("UPDATE %s SET v = v + ? WHERE k = 0", set(allocate(1)));
assertNotWarnedOnFlush();
execute("INSERT INTO %s (k, v) VALUES (1, ?)", set(allocate(WARN_THRESHOLD / 4)));
execute("UPDATE %s SET v = v + ? WHERE k = 1", set(allocate(WARN_THRESHOLD * 3 / 4)));
assertWarnedOnFlush(warnMessage("1"));
execute("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(FAIL_THRESHOLD / 4)));
execute("UPDATE %s SET v = v + ? WHERE k = 2", set(allocate(FAIL_THRESHOLD * 3 / 4)));
assertFailedOnFlush(failMessage("2"));
execute("INSERT INTO %s (k, v) VALUES (4, ?)", set(allocate(FAIL_THRESHOLD)));
execute("UPDATE %s SET v = v - ? WHERE k = 4", set(allocate(FAIL_THRESHOLD)));
assertNotWarnedOnFlush();
}
@Test
public void testSetSizeAfterCompaction() throws Throwable
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, ?)", set(allocate(1)));
assertNotWarnedOnFlush();
execute("UPDATE %s SET v = v + ? WHERE k = 0", set(allocate(1)));
assertNotWarnedOnFlush();
assertNotWarnedOnCompact();
execute("INSERT INTO %s (k, v) VALUES (1, ?)", set(allocate(WARN_THRESHOLD * 3 / 4)));
assertNotWarnedOnFlush();
execute("UPDATE %s SET v = v + ? WHERE k = 1", set(allocate(WARN_THRESHOLD / 4)));
assertNotWarnedOnFlush();
assertWarnedOnCompact(warnMessage("1"));
execute("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(FAIL_THRESHOLD * 3 / 4)));
assertWarnedOnFlush(warnMessage("2"));
execute("UPDATE %s SET v = v + ? WHERE k = 2", set(allocate(FAIL_THRESHOLD / 4)));
assertWarnedOnFlush(warnMessage("2"));
assertFailedOnCompact(failMessage("2"));
execute("INSERT INTO %s (k, v) VALUES (3, ?)", set(allocate(FAIL_THRESHOLD)));
assertFailedOnFlush(failMessage("3"));
execute("UPDATE %s SET v = v - ? WHERE k = 3", set(allocate(FAIL_THRESHOLD)));
assertNotWarnedOnFlush();
assertNotWarnedOnCompact();
}
@Test
public void testListSize() throws Throwable
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, null)");
execute("INSERT INTO %s (k, v) VALUES (1, ?)", list());
execute("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
execute("INSERT INTO %s (k, v) VALUES (3, ?)", list(allocate(WARN_THRESHOLD / 2)));
assertNotWarnedOnFlush();
execute("INSERT INTO %s (k, v) VALUES (4, ?)", list(allocate(WARN_THRESHOLD)));
assertWarnedOnFlush(warnMessage("4"));
execute("INSERT INTO %s (k, v) VALUES (5, ?)", list(allocate(WARN_THRESHOLD / 2), allocate(WARN_THRESHOLD / 2)));
assertWarnedOnFlush(warnMessage("5"));
execute("INSERT INTO %s (k, v) VALUES (6, ?)", list(allocate(FAIL_THRESHOLD)));
assertFailedOnFlush(failMessage("6"));
execute("INSERT INTO %s (k, v) VALUES (7, ?)", list(allocate(FAIL_THRESHOLD / 2), allocate(FAIL_THRESHOLD / 2)));
assertFailedOnFlush(failMessage("7"));
}
@Test
public void testListSizeFrozen()
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v frozen<list<blob>>)");
execute("INSERT INTO %s (k, v) VALUES (0, null)");
execute("INSERT INTO %s (k, v) VALUES (1, ?)", list());
execute("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
execute("INSERT INTO %s (k, v) VALUES (4, ?)", list(allocate(WARN_THRESHOLD)));
execute("INSERT INTO %s (k, v) VALUES (5, ?)", list(allocate(FAIL_THRESHOLD)));
// frozen collections size is not checked during sstable write
assertNotWarnedOnFlush();
}
@Test
public void testListSizeWithUpdates()
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, ?)", list(allocate(1)));
execute("UPDATE %s SET v = v + ? WHERE k = 0", list(allocate(1)));
assertNotWarnedOnFlush();
execute("INSERT INTO %s (k, v) VALUES (1, ?)", list(allocate(WARN_THRESHOLD / 2)));
execute("UPDATE %s SET v = v + ? WHERE k = 1", list(allocate(WARN_THRESHOLD / 2)));
assertWarnedOnFlush(warnMessage("1"));
execute("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(FAIL_THRESHOLD / 2)));
execute("UPDATE %s SET v = v + ? WHERE k = 2", list(allocate(FAIL_THRESHOLD / 2)));
assertFailedOnFlush(failMessage("2"));
execute("INSERT INTO %s (k, v) VALUES (4, ?)", list(allocate(FAIL_THRESHOLD)));
execute("UPDATE %s SET v = v - ? WHERE k = 4", list(allocate(FAIL_THRESHOLD)));
assertNotWarnedOnFlush();
}
@Test
public void testListSizeAfterCompaction() throws Throwable
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, ?)", list(allocate(1)));
assertNotWarnedOnFlush();
execute("UPDATE %s SET v = v + ? WHERE k = 0", list(allocate(1)));
assertNotWarnedOnFlush();
assertNotWarnedOnCompact();
execute("INSERT INTO %s (k, v) VALUES (1, ?)", list(allocate(WARN_THRESHOLD / 2)));
assertNotWarnedOnFlush();
execute("UPDATE %s SET v = v + ? WHERE k = 1", list(allocate(WARN_THRESHOLD / 2)));
assertNotWarnedOnFlush();
assertWarnedOnCompact(warnMessage("1"));
execute("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(FAIL_THRESHOLD / 2)));
assertWarnedOnFlush(warnMessage("2"));
execute("UPDATE %s SET v = v + ? WHERE k = 2", list(allocate(FAIL_THRESHOLD / 2)));
assertNotWarnedOnFlush();
assertFailedOnCompact(failMessage("2"));
execute("INSERT INTO %s (k, v) VALUES (3, ?)", list(allocate(FAIL_THRESHOLD)));
assertFailedOnFlush(failMessage("3"));
execute("UPDATE %s SET v[0] = ? WHERE k = 3", allocate(1));
assertNotWarnedOnFlush();
assertNotWarnedOnCompact();
}
@Test
public void testMapSize() throws Throwable
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, null)");
execute("INSERT INTO %s (k, v) VALUES (1, ?)", map());
execute("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1), allocate(1)));
execute("INSERT INTO %s (k, v) VALUES (3, ?)", map(allocate(1), allocate(WARN_THRESHOLD / 2)));
execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(WARN_THRESHOLD / 2), allocate(1)));
assertNotWarnedOnFlush();
execute("INSERT INTO %s (k, v) VALUES (5, ?)", map(allocate(WARN_THRESHOLD), allocate(1)));
assertWarnedOnFlush(warnMessage("5"));
execute("INSERT INTO %s (k, v) VALUES (6, ?)", map(allocate(1), allocate(WARN_THRESHOLD)));
assertWarnedOnFlush(warnMessage("6"));
execute("INSERT INTO %s (k, v) VALUES (7, ?)", map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
assertWarnedOnFlush(warnMessage("7"));
execute("INSERT INTO %s (k, v) VALUES (8, ?)", map(allocate(FAIL_THRESHOLD), allocate(1)));
assertFailedOnFlush(failMessage("8"));
execute("INSERT INTO %s (k, v) VALUES (9, ?)", map(allocate(1), allocate(FAIL_THRESHOLD)));
assertFailedOnFlush(failMessage("9"));
execute("INSERT INTO %s (k, v) VALUES (10, ?)", map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
assertFailedOnFlush(failMessage("10"));
}
@Test
public void testMapSizeFrozen()
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<blob, blob>>)");
execute("INSERT INTO %s (k, v) VALUES (0, null)");
execute("INSERT INTO %s (k, v) VALUES (1, ?)", map());
execute("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1), allocate(1)));
execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1), allocate(WARN_THRESHOLD)));
execute("INSERT INTO %s (k, v) VALUES (5, ?)", map(allocate(WARN_THRESHOLD), allocate(1)));
execute("INSERT INTO %s (k, v) VALUES (6, ?)", map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
execute("INSERT INTO %s (k, v) VALUES (7, ?)", map(allocate(1), allocate(FAIL_THRESHOLD)));
execute("INSERT INTO %s (k, v) VALUES (8, ?)", map(allocate(FAIL_THRESHOLD), allocate(1)));
execute("INSERT INTO %s (k, v) VALUES (9, ?)", map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
// frozen collections size is not checked during sstable write
assertNotWarnedOnFlush();
}
@Test
public void testMapSizeWithUpdates()
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, ?)", map(allocate(1), allocate(1)));
execute("UPDATE %s SET v = v + ? WHERE k = 0", map(allocate(1), allocate(1)));
assertNotWarnedOnFlush();
execute("INSERT INTO %s (k, v) VALUES (1, ?)", map(allocate(1), allocate(WARN_THRESHOLD / 2)));
execute("UPDATE %s SET v = v + ? WHERE k = 1", map(allocate(2), allocate(WARN_THRESHOLD / 2)));
assertWarnedOnFlush(warnMessage("1"));
execute("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(WARN_THRESHOLD / 4), allocate(1)));
execute("UPDATE %s SET v = v + ? WHERE k = 2", map(allocate(WARN_THRESHOLD * 3 / 4), allocate(1)));
assertWarnedOnFlush(warnMessage("2"));
execute("INSERT INTO %s (k, v) VALUES (3, ?)", map(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD / 4)));
execute("UPDATE %s SET v = v + ? WHERE k = 3", map(allocate(WARN_THRESHOLD / 4 + 1), allocate(WARN_THRESHOLD / 4)));
assertWarnedOnFlush(warnMessage("3"));
execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1), allocate(FAIL_THRESHOLD / 2)));
execute("UPDATE %s SET v = v + ? WHERE k = 4", map(allocate(2), allocate(FAIL_THRESHOLD / 2)));
assertFailedOnFlush(failMessage("4"));
execute("INSERT INTO %s (k, v) VALUES (5, ?)", map(allocate(FAIL_THRESHOLD / 4), allocate(1)));
execute("UPDATE %s SET v = v + ? WHERE k = 5", map(allocate(FAIL_THRESHOLD * 3 / 4), allocate(1)));
assertFailedOnFlush(failMessage("5"));
execute("INSERT INTO %s (k, v) VALUES (6, ?)", map(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD / 4)));
execute("UPDATE %s SET v = v + ? WHERE k = 6", map(allocate(FAIL_THRESHOLD / 4 + 1), allocate(FAIL_THRESHOLD / 4)));
assertFailedOnFlush(failMessage("6"));
}
@Test
public void testMapSizeAfterCompaction()
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
execute("INSERT INTO %s (k, v) VALUES (0, ?)", map(allocate(1), allocate(1)));
execute("UPDATE %s SET v = v + ? WHERE k = 0", map(allocate(1), allocate(1)));
assertNotWarnedOnFlush();
assertNotWarnedOnCompact();
execute("INSERT INTO %s (k, v) VALUES (1, ?)", map(allocate(1), allocate(WARN_THRESHOLD / 2)));
assertNotWarnedOnFlush();
execute("UPDATE %s SET v = v + ? WHERE k = 1", map(allocate(2), allocate(WARN_THRESHOLD / 2)));
assertNotWarnedOnFlush();
assertWarnedOnCompact(warnMessage("1"));
execute("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(WARN_THRESHOLD / 4), allocate(1)));
assertNotWarnedOnFlush();
execute("UPDATE %s SET v = v + ? WHERE k = 2", map(allocate(WARN_THRESHOLD * 3 / 4), allocate(1)));
assertNotWarnedOnFlush();
assertWarnedOnCompact(warnMessage("2"));
execute("INSERT INTO %s (k, v) VALUES (3, ?)", map(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD / 4)));
assertNotWarnedOnFlush();
execute("UPDATE %s SET v = v + ? WHERE k = 3", map(allocate(WARN_THRESHOLD / 4 + 1), allocate(WARN_THRESHOLD / 4)));
assertNotWarnedOnFlush();
assertWarnedOnCompact(warnMessage("3"));
execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1), allocate(FAIL_THRESHOLD / 2)));
assertWarnedOnFlush(failMessage("4"));
execute("UPDATE %s SET v = v + ? WHERE k = 4", map(allocate(2), allocate(FAIL_THRESHOLD / 2)));
assertWarnedOnFlush(warnMessage("4"));
assertFailedOnCompact(failMessage("4"));
execute("INSERT INTO %s (k, v) VALUES (5, ?)", map(allocate(FAIL_THRESHOLD / 4), allocate(1)));
assertWarnedOnFlush(failMessage("5"));
execute("UPDATE %s SET v = v + ? WHERE k = 5", map(allocate(FAIL_THRESHOLD * 3 / 4), allocate(1)));
assertWarnedOnFlush(warnMessage("5"));
assertFailedOnCompact(failMessage("5"));
execute("INSERT INTO %s (k, v) VALUES (6, ?)", map(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD / 4)));
assertWarnedOnFlush(failMessage("6"));
execute("UPDATE %s SET v = v + ? WHERE k = 6", map(allocate(FAIL_THRESHOLD / 4 + 1), allocate(FAIL_THRESHOLD / 4)));
assertWarnedOnFlush(warnMessage("6"));
assertFailedOnCompact(failMessage("6"));
}
@Test
public void testCompositePartitionKey()
{
schemaChange("CREATE TABLE %s (k1 int, k2 text, v set<blob>, PRIMARY KEY((k1, k2)))");
execute("INSERT INTO %s (k1, k2, v) VALUES (0, 'a', ?)", set(allocate(WARN_THRESHOLD)));
assertWarnedOnFlush(warnMessage("(0, 'a')"));
execute("INSERT INTO %s (k1, k2, v) VALUES (1, 'b', ?)", set(allocate(FAIL_THRESHOLD)));
assertFailedOnFlush(failMessage("(1, 'b')"));
}
@Test
public void testCompositeClusteringKey()
{
schemaChange("CREATE TABLE %s (k int, c1 int, c2 text, v set<blob>, PRIMARY KEY(k, c1, c2))");
execute("INSERT INTO %s (k, c1, c2, v) VALUES (1, 10, 'a', ?)", set(allocate(WARN_THRESHOLD)));
assertWarnedOnFlush(warnMessage("(1, 10, 'a')"));
execute("INSERT INTO %s (k, c1, c2, v) VALUES (2, 20, 'b', ?)", set(allocate(FAIL_THRESHOLD)));
assertFailedOnFlush(failMessage("(2, 20, 'b')"));
}
private void execute(String query, Object... args)
{
SimpleStatement stmt = new SimpleStatement(format(query), args);
stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL);
driverSession.execute(stmt);
}
private String warnMessage(String key)
{
return String.format("Detected collection v in row %s in table %s of size", key, qualifiedTableName);
}
private String failMessage(String key)
{
return String.format("Detected collection v in row %s in table %s of size", key, qualifiedTableName);
}
}