blob: a47c782ee45d6c5d9db89d112a42db3d72a2a220 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.metrics;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.Metrics;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.net.Verb;
import org.awaitility.core.ThrowingRunnable;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.await;
/**
* Tests {@link HintsServiceMetrics}.
*/
public class HintsServiceMetricsTest extends TestBaseImpl
{
private static final int NUM_ROWS = 100;
private static final int NUM_FAILURES_PER_NODE = 5;
private static final int NUM_TIMEOUTS_PER_NODE = 3;
@Test
public void testHintsServiceMetrics() throws Exception
{
// setup a 3-node cluster with a bytebuddy injection that makes the writting of some hints to fail
try (Cluster cluster = builder().withNodes(3)
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
.withInstanceInitializer(FailHints::install)
.start())
{
// setup a message filter to drop some of the hint request messages from node1
AtomicInteger hintsNode2 = new AtomicInteger();
AtomicInteger hintsNode3 = new AtomicInteger();
cluster.filters()
.verbs(Verb.HINT_REQ.id)
.from(1)
.messagesMatching((from, to, message) ->
(to == 2 && hintsNode2.incrementAndGet() <= NUM_TIMEOUTS_PER_NODE) ||
(to == 3 && hintsNode3.incrementAndGet() <= NUM_TIMEOUTS_PER_NODE))
.drop();
// setup a message filter to drop mutations requests from node1, so it creates hints for those mutations
AtomicBoolean dropWritesForNode2 = new AtomicBoolean(false);
AtomicBoolean dropWritesForNode3 = new AtomicBoolean(false);
cluster.filters()
.verbs(Verb.MUTATION_REQ.id)
.from(1)
.messagesMatching((from, to, message) ->
(to == 2 && dropWritesForNode2.get()) ||
(to == 3 && dropWritesForNode3.get()))
.drop();
// fix under replicated keyspaces so they don't produce hint requests while we are dropping mutations
fixDistributedSchemas(cluster);
cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}"));
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int PRIMARY KEY, v int)"));
ICoordinator coordinator = cluster.coordinator(1);
IInvokableInstance node1 = cluster.get(1);
IInvokableInstance node2 = cluster.get(2);
IInvokableInstance node3 = cluster.get(3);
// write the first half of the rows with the second node dropping mutation requests,
// so some hints will be created for that node
dropWritesForNode2.set(true);
for (int i = 0; i < NUM_ROWS / 2; i++)
coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (?, ?)"), QUORUM, i, i);
dropWritesForNode2.set(false);
// write the second half of the rows with the third node dropping mutations requests,
// so some hints will be created for that node
dropWritesForNode3.set(true);
for (int i = NUM_ROWS / 2; i < NUM_ROWS; i++)
coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (?, ?)"), QUORUM, i, i);
dropWritesForNode3.set(false);
// wait until all the hints have been successfully applied to the nodes that have been dropping mutations
waitUntilAsserted(() -> assertThat(countRows(node2)).isEqualTo(countRows(node3)).isEqualTo(NUM_ROWS));
// Verify the metrics for the coordinator node, which is the only one actually sending hints.
// The hint delivery errors that we have injected should have made the service try to send them again.
// These retries are done periodically and in pages, so the retries may send again some of the hints that
// were already successfully sent. This way, there may be more succeeded hints than actual hints/rows.
waitUntilAsserted(() -> assertThat(countHintsSucceeded(node1)).isGreaterThanOrEqualTo(NUM_ROWS));
waitUntilAsserted(() -> assertThat(countHintsFailed(node1)).isEqualTo(NUM_FAILURES_PER_NODE * 2));
waitUntilAsserted(() -> assertThat(countHintsTimedOut(node1)).isEqualTo(NUM_TIMEOUTS_PER_NODE * 2));
// verify delay metrics
long numGlobalDelays = countGlobalDelays(node1);
assertThat(numGlobalDelays).isGreaterThanOrEqualTo(NUM_ROWS);
assertThat(countEndpointDelays(node1, node1)).isEqualTo(0);
assertThat(countEndpointDelays(node1, node2)).isGreaterThan(0).isLessThanOrEqualTo(numGlobalDelays);
assertThat(countEndpointDelays(node1, node3)).isGreaterThan(0).isLessThanOrEqualTo(numGlobalDelays);
assertThat(countEndpointDelays(node1, node2) + countEndpointDelays(node1, node3)).isGreaterThanOrEqualTo(numGlobalDelays);
// verify that the metrics for the not-coordinator nodes are zero
for (IInvokableInstance node : Arrays.asList(node2, node3))
{
assertThat(countHintsSucceeded(node)).isEqualTo(0);
assertThat(countHintsFailed(node)).isEqualTo(0);
assertThat(countHintsTimedOut(node)).isEqualTo(0);
assertThat(countGlobalDelays(node)).isEqualTo(0);
cluster.forEach(target -> assertThat(countEndpointDelays(node, target)).isEqualTo(0));
}
}
}
private static void waitUntilAsserted(ThrowingRunnable assertion)
{
await().atMost(5, MINUTES)
.pollDelay(0, SECONDS)
.pollInterval(1, SECONDS)
.untilAsserted(assertion);
}
private static int countRows(IInvokableInstance node)
{
return node.executeInternal(withKeyspace("SELECT * FROM %s.t")).length;
}
@SuppressWarnings("Convert2MethodRef")
private static Long countHintsSucceeded(IInvokableInstance node)
{
return node.callOnInstance(() -> HintsServiceMetrics.hintsSucceeded.getCount());
}
@SuppressWarnings("Convert2MethodRef")
private static Long countHintsFailed(IInvokableInstance node)
{
return node.callOnInstance(() -> HintsServiceMetrics.hintsFailed.getCount());
}
@SuppressWarnings("Convert2MethodRef")
private static Long countHintsTimedOut(IInvokableInstance node)
{
return node.callOnInstance(() -> HintsServiceMetrics.hintsTimedOut.getCount());
}
private static Long countGlobalDelays(IInvokableInstance node)
{
return getHistogramCount(node, "org.apache.cassandra.metrics.HintsService.Hint_delays");
}
private static Long countEndpointDelays(IInvokableInstance node, IInvokableInstance target)
{
return getHistogramCount(node, String.format("org.apache.cassandra.metrics.HintsService.Hint_delays-%s.%d",
target.broadcastAddress().getAddress(),
target.broadcastAddress().getPort()));
}
private static long getHistogramCount(IInvokableInstance node, String name)
{
return node.metrics()
.getHistograms(s -> s.equals(name), Metrics.MetricValue.COUNT)
.values()
.stream()
.findFirst()
.map(Math::round)
.orElse(0L);
}
/**
* Bytebuddy injection to make the application of hints to fail on the destination node.
*/
public static class FailHints
{
private static final AtomicInteger numHints = new AtomicInteger(0);
private static void install(ClassLoader cl, int nodeNumber)
{
// we can ignore the coordinator node
if (nodeNumber == 1)
return;
new ByteBuddy().rebase(Hint.class)
.method(named("applyFuture").and(takesArguments(0)))
.intercept(MethodDelegation.to(FailHints.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
public static CompletableFuture<?> execute(@SuperCall Callable<CompletableFuture<?>> r) throws Exception
{
if (numHints.incrementAndGet() <= NUM_FAILURES_PER_NODE)
throw new RuntimeException("Injected failure");
return r.call();
}
}
}