blob: e1868e280f67c838f872e4ff1181f39a07fc9007 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.guardrails;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
import org.apache.cassandra.service.disk.usage.DiskUsageState;
import org.assertj.core.api.Assertions;
import static net.bytebuddy.matcher.ElementMatchers.named;
/**
* Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} and {@link Guardrails#replicaDiskUsage}.
*/
public class GuardrailDiskUsageTest extends GuardrailTester
{
private static final int NUM_ROWS = 100;
private static final String WARN_MESSAGE = "Replica disk usage exceeds warning threshold";
private static final String FAIL_MESSAGE = "Write request failed because disk usage exceeds failure threshold";
private static Cluster cluster;
private static com.datastax.driver.core.Cluster driverCluster;
private static Session driverSession;
@BeforeClass
public static void setupCluster() throws IOException
{
// speed up the task that calculates and propagates the disk usage info
CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.setInt(100);
// build a 2-node cluster with RF=1
cluster = init(Cluster.build(2)
.withInstanceInitializer(DiskStateInjection::install)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)
.set("data_disk_usage_percentage_warn_threshold", 98)
.set("data_disk_usage_percentage_fail_threshold", 99)
.set("authenticator", "PasswordAuthenticator"))
.start(), 1);
// create a regular user, since the default superuser is excluded from guardrails
com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
try (com.datastax.driver.core.Cluster c = builder.withCredentials("cassandra", "cassandra").build();
Session session = c.connect())
{
session.execute("CREATE USER test WITH PASSWORD 'test'");
}
// connect using that superuser, we use the driver to get access to the client warnings
driverCluster = builder.withCredentials("test", "test").build();
driverSession = driverCluster.connect();
}
@AfterClass
public static void teardownCluster()
{
if (driverSession != null)
driverSession.close();
if (driverCluster != null)
driverCluster.close();
if (cluster != null)
cluster.close();
}
@Override
protected Cluster getCluster()
{
return cluster;
}
@Test
public void testDiskUsage() throws Throwable
{
schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v int)");
String insert = format("INSERT INTO %s(k, v) VALUES (?, 0)");
// With both nodes in SPACIOUS state, we can write without warnings nor failures
for (int i = 0; i < NUM_ROWS; i++)
{
ResultSet rs = driverSession.execute(insert, i);
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
}
// If the disk usage information about one node becomes unavailable, we can still write without warnings
DiskStateInjection.setState(getCluster(), 2, DiskUsageState.NOT_AVAILABLE);
for (int i = 0; i < NUM_ROWS; i++)
{
ResultSet rs = driverSession.execute(insert, i);
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
}
// If one node becomes STUFFED, the writes targeting that node will raise a warning, while the writes targetting
// the node that remains SPACIOUS will keep succeeding without warnings
DiskStateInjection.setState(getCluster(), 2, DiskUsageState.STUFFED);
int numWarnings = 0;
for (int i = 0; i < NUM_ROWS; i++)
{
ResultSet rs = driverSession.execute(insert, i);
List<String> warnings = rs.getExecutionInfo().getWarnings();
if (!warnings.isEmpty())
{
Assertions.assertThat(warnings).hasSize(1).anyMatch(s -> s.contains(WARN_MESSAGE));
numWarnings++;
}
}
Assertions.assertThat(numWarnings).isGreaterThan(0).isLessThan(NUM_ROWS);
// If the STUFFED node becomes FULL, the writes targeting that node will fail, while the writes targeting
// the node that remains SPACIOUS will keep succeeding without warnings
DiskStateInjection.setState(getCluster(), 2, DiskUsageState.FULL);
int numFailures = 0;
for (int i = 0; i < NUM_ROWS; i++)
{
try
{
ResultSet rs = driverSession.execute(insert, i);
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
}
catch (InvalidQueryException e)
{
Assertions.assertThat(e).hasMessageContaining(FAIL_MESSAGE);
numFailures++;
}
}
Assertions.assertThat(numFailures).isGreaterThan(0).isLessThan(NUM_ROWS);
// If both nodes are FULL, all queries will fail
DiskStateInjection.setState(getCluster(), 1, DiskUsageState.FULL);
for (int i = 0; i < NUM_ROWS; i++)
{
try
{
driverSession.execute(insert, i);
Assertions.fail("Should have failed");
}
catch (InvalidQueryException e)
{
numFailures++;
}
}
// Finally, if both nodes go back to SPACIOUS, all queries will succeed again
DiskStateInjection.setState(getCluster(), 1, DiskUsageState.SPACIOUS);
DiskStateInjection.setState(getCluster(), 2, DiskUsageState.SPACIOUS);
for (int i = 0; i < NUM_ROWS; i++)
{
ResultSet rs = driverSession.execute(insert, i);
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
}
}
/**
* ByteBuddy rule to override the disk usage state of each node.
*/
public static class DiskStateInjection
{
public static volatile DiskUsageState state = DiskUsageState.SPACIOUS;
private static void install(ClassLoader cl, int node)
{
new ByteBuddy().rebase(DiskUsageMonitor.class)
.method(named("getState"))
.intercept(MethodDelegation.to(DiskStateInjection.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
public static void setState(Cluster cluster, int node, DiskUsageState state)
{
IInvokableInstance instance = cluster.get(node);
instance.runOnInstance(() -> DiskStateInjection.state = state);
// wait for disk usage state propagation, all nodes must see it
InetAddressAndPort enpoint = InetAddressAndPort.getByAddress(instance.broadcastAddress());
cluster.forEach(n -> n.runOnInstance(() -> Util.spinAssertEquals(state, () -> DiskUsageBroadcaster.instance.state(enpoint), 60)));
}
@SuppressWarnings("unused")
public static DiskUsageState getState(long usagePercentage, @SuperCall Callable<DiskUsageState> zuper)
{
return state;
}
}
}