blob: 5e409cbc31e03fefb981753cd513e110a0f38978 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.thresholds;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.SimpleStatement;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import net.bytebuddy.implementation.bind.annotation.This;
import org.apache.cassandra.concurrent.SEPExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.test.JavaDriverUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.TombstoneAbortException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
import org.apache.cassandra.utils.Shared;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.assertj.core.api.Assertions.assertThat;
public class TombstoneCountWarningTest extends TestBaseImpl
{
private static final Logger logger = LoggerFactory.getLogger(TombstoneCountWarningTest.class);
private static final int TOMBSTONE_WARN = 50;
private static final int TOMBSTONE_FAIL = 100;
private static ICluster<IInvokableInstance> CLUSTER;
private static com.datastax.driver.core.Cluster JAVA_DRIVER;
private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
@BeforeClass
public static void setupClass() throws IOException
{
logger.info("[test step : @BeforeClass] setupClass");
Cluster.Builder builder = Cluster.build(3);
builder.withConfig(c -> c.set("tombstone_warn_threshold", TOMBSTONE_WARN)
.set("tombstone_failure_threshold", TOMBSTONE_FAIL)
.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
builder.withInstanceInitializer(BB::install);
CLUSTER = builder.start();
JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
}
@AfterClass
public static void teardown()
{
logger.info("[test step : @AfterClass] teardown");
if (JAVA_DRIVER_SESSION != null)
JAVA_DRIVER_SESSION.close();
if (JAVA_DRIVER != null)
JAVA_DRIVER.close();
}
@Before
public void setup()
{
logger.info("[test step : @Before] setup");
CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
init(CLUSTER);
CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
}
private static void enable(boolean value)
{
CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadThresholdsEnabled(value)));
}
@Test
public void noWarningsSinglePartition()
{
logger.info("[test step : @Test] noWarningsSinglePartition");
noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
}
@Test
public void noWarningsScan()
{
logger.info("[test step : @Test] noWarningsScan");
noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
}
public void noWarnings(String cql)
{
Consumer<List<String>> test = warnings ->
Assert.assertEquals(Collections.emptyList(), warnings);
for (int i=0; i<TOMBSTONE_WARN; i++)
CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
for (boolean b : Arrays.asList(true, false))
{
enable(b);
SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
test.accept(result.warnings());
test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
assertWarnAborts(0, 0, 0);
}
}
@Test
public void warnThresholdSinglePartition()
{
logger.info("[test step : @Test] warnThresholdSinglePartition");
warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
}
@Test
public void warnThresholdScan()
{
logger.info("[test step : @Test] warnThresholdScan");
warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
}
private void warnThreshold(String cql, boolean isScan)
{
for (int i = 0; i < TOMBSTONE_WARN + 1; i++)
CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
enable(true);
Consumer<List<String>> testEnabled = warnings ->
Assertions.assertThat(Iterables.getOnlyElement(warnings))
.contains("nodes scanned up to " + (TOMBSTONE_WARN + 1) + " tombstones and issued tombstone warnings for query " + cql);
SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
testEnabled.accept(result.warnings());
assertWarnAborts(1, 0, 0);
testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
assertWarnAborts(2, 0, 0);
enable(false);
Consumer<List<String>> testDisabled = warnings -> {
// client warnings are currently coordinator only, so if present only 1 is expected
if (isScan)
{
// Scans perform multiple ReadCommands, which will not propgate the warnings to the top-level coordinator; so no warnings are expected
Assertions.assertThat(warnings).isEmpty();
}
else
{
Assertions.assertThat(Iterables.getOnlyElement(warnings))
.startsWith("Read " + (TOMBSTONE_WARN + 1) + " live rows and " + (TOMBSTONE_WARN + 1) + " tombstone cells for query " + cql);
}
};
result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
testDisabled.accept(result.warnings());
assertWarnAborts(2, 0, 0);
testDisabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
assertWarnAborts(2, 0, 0);
}
@Test
public void failThresholdSinglePartition() throws UnknownHostException
{
logger.info("[test step : @Test] failThresholdSinglePartition");
failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
}
@Test
public void failThresholdScan() throws UnknownHostException
{
logger.info("[test step : @Test] failThresholdScan");
failThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
}
private void failThreshold(String cql, boolean isScan) throws UnknownHostException
{
for (int i = 0; i < TOMBSTONE_FAIL + 1; i++)
CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
enable(true);
List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
ClientWarn.instance.captureWarnings();
CoordinatorWarnings.init();
try
{
QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
Assert.fail("Expected query failure");
}
catch (TombstoneAbortException e)
{
Assert.assertTrue(e.nodes >= 1 && e.nodes <= 3);
Assert.assertEquals(TOMBSTONE_FAIL + 1, e.tombstones);
// expected, client transport returns an error message and includes client warnings
}
CoordinatorWarnings.done();
CoordinatorWarnings.reset();
return ClientWarn.instance.getWarnings();
}).call();
Assertions.assertThat(Iterables.getOnlyElement(warnings))
.contains("nodes scanned over " + (TOMBSTONE_FAIL + 1) + " tombstones and aborted the query " + cql);
assertWarnAborts(0, 1, 1);
try
{
driverQueryAll(cql);
Assert.fail("Query should have thrown ReadFailureException");
}
catch (com.datastax.driver.core.exceptions.ReadFailureException e)
{
// without changing the client can't produce a better message...
// client does NOT include the message sent from the server in the exception; so the message doesn't work
// well in this case
Assertions.assertThat(e.getMessage()).contains("responses were required but only 0 replica responded"); // can't include ', 3 failed)' as some times its 2
ImmutableSet<InetAddress> expectedKeys = ImmutableSet.of(InetAddress.getByAddress(new byte[]{ 127, 0, 0, 1 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 2 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 3 }));
assertThat(e.getFailuresMap())
.hasSizeBetween(1, 3)
// coordinator changes from run to run, so can't assert map as the key is dynamic... so assert the domain of keys and the single value expect
.containsValue(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code)
.hasKeySatisfying(new Condition<InetAddress>() {
public boolean matches(InetAddress value)
{
return expectedKeys.contains(value);
}
});
}
assertWarnAborts(0, 2, 1);
// when disabled warnings only happen if on the coordinator, and coordinator may not be the one replying
// to every query
enable(false);
State.blockFor(CLUSTER.get(1).config().broadcastAddress());
warnings = CLUSTER.get(1).callsOnInstance(() -> {
ClientWarn.instance.captureWarnings();
try
{
QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
Assert.fail("Expected query failure");
}
catch (ReadFailureException e)
{
Assertions.assertThat(e).isNotInstanceOf(TombstoneAbortException.class);
Assertions.assertThat(e.failureReasonByEndpoint).isNotEmpty();
Assertions.assertThat(e.failureReasonByEndpoint.values())
.as("Non READ_TOO_MANY_TOMBSTONES exists")
.allMatch(RequestFailureReason.READ_TOO_MANY_TOMBSTONES::equals);
}
logger.warn("Checking warnings...");
return ClientWarn.instance.getWarnings();
}).call();
// client warnings are currently coordinator only, so if present only 1 is expected
if (isScan)
{
// Scans perform multiple ReadCommands, which will not propgate the warnings to the top-level coordinator; so no warnings are expected
Assertions.assertThat(warnings).isNull();
}
else
{
Assertions.assertThat(Iterables.getOnlyElement(warnings))
.startsWith("Read " + TOMBSTONE_FAIL + " live rows and " + (TOMBSTONE_FAIL + 1) + " tombstone cells for query " + cql);
}
assertWarnAborts(0, 2, 0);
State.blockFor(CLUSTER.get(1).config().broadcastAddress());
try
{
driverQueryAll(cql);
Assert.fail("Query should have thrown ReadFailureException");
}
catch (com.datastax.driver.core.exceptions.ReadFailureException e)
{
// not checking the message as different cases exist for the failure, so the fact that this failed is enough
Assertions.assertThat(e.getFailuresMap())
.isNotEmpty();
Assertions.assertThat(e.getFailuresMap().values())
.as("Non READ_TOO_MANY_TOMBSTONES exists")
.allMatch(i -> i.equals(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code));
}
assertWarnAborts(0, 2, 0);
}
private static long GLOBAL_READ_ABORTS = 0;
private static void assertWarnAborts(int warns, int aborts, int globalAborts)
{
Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
GLOBAL_READ_ABORTS = expectedGlobalAborts;
}
private static long totalWarnings()
{
return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientTombstoneWarnings." + KEYSPACE)).sum();
}
private static long totalAborts()
{
return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientTombstoneAborts." + KEYSPACE)).sum();
}
private static long totalReadAborts()
{
return CLUSTER.stream().mapToLong(i ->
i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")
+ i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.RangeSlice")
).sum();
}
private static ResultSet driverQueryAll(String cql)
{
return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
}
@Shared
public static class State
{
// use InetSocketAddress as InetAddressAndPort is @Isolated which means equality doesn't work due to different
// ClassLoaders; InetSocketAddress is @Shared so safe to use between app and cluster class loaders
public static volatile InetSocketAddress blockFor = null;
public static volatile CompletableFuture<Void> promise = null;
// called on main thread
public static void blockFor(InetSocketAddress address)
{
blockFor = address;
promise = new CompletableFuture<>();
}
// called in C* threads; non-test threads
public static void onFailure(InetSocketAddress address)
{
if (address.equals(blockFor))
promise.complete(null);
}
// called on main thread
public static void syncAndClear()
{
if (blockFor != null)
{
promise.join();
blockFor = null;
promise = null;
}
}
}
public static class BB
{
private static void install(ClassLoader cl, int instanceId)
{
if (instanceId != 1)
return;
new ByteBuddy().rebase(ReadCallback.class)
.method(named("awaitResults"))
.intercept(MethodDelegation.to(BB.class))
.method(named("onFailure"))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
new ByteBuddy().rebase(SEPExecutor.class)
.method(named("maybeExecuteImmediately"))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
@SuppressWarnings("unused")
public static void awaitResults(@SuperCall Runnable zuper)
{
State.syncAndClear();
zuper.run();
}
@SuppressWarnings("unused")
public static void onFailure(InetAddressAndPort from, RequestFailureReason failureReason, @SuperCall Runnable zuper) throws Exception
{
State.onFailure(new InetSocketAddress(from.getAddress(), from.getPort()));
zuper.run();
}
// make sure to schedule the task rather than running inline...
// this is imporant as the read may block on the local version which can get the test to include it rather than
// block waiting, so by scheduling we make sure its always fair
@SuppressWarnings("unused")
public static void maybeExecuteImmediately(Runnable task, @This SEPExecutor executor)
{
executor.execute(task);
}
}
}