blob: 0fc9bff174b8b8f29874298ab5dded62630f9081 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.distributed.test.metrics;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.Metrics;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.utils.concurrent.Future;
import org.awaitility.core.ThrowingRunnable;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.await;
* Tests {@link HintsServiceMetrics}.
public class HintsServiceMetricsTest extends TestBaseImpl
private static final int NUM_ROWS = 100;
private static final int NUM_FAILURES_PER_NODE = 5;
private static final int NUM_TIMEOUTS_PER_NODE = 3;
public void testHintsServiceMetrics() throws Exception
// setup a 3-node cluster with a bytebuddy injection that makes the writting of some hints to fail
try (Cluster cluster = builder().withNodes(3)
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
cluster.setUncaughtExceptionsFilter(t -> "Injected failure".equals(t.getMessage()));
// setup a message filter to drop some of the hint request messages from node1
AtomicInteger hintsNode2 = new AtomicInteger();
AtomicInteger hintsNode3 = new AtomicInteger();
.messagesMatching((from, to, message) ->
(to == 2 && hintsNode2.incrementAndGet() <= NUM_TIMEOUTS_PER_NODE) ||
(to == 3 && hintsNode3.incrementAndGet() <= NUM_TIMEOUTS_PER_NODE))
// setup a message filter to drop mutations requests from node1, so it creates hints for those mutations
AtomicBoolean dropWritesForNode2 = new AtomicBoolean(false);
AtomicBoolean dropWritesForNode3 = new AtomicBoolean(false);
.messagesMatching((from, to, message) ->
(to == 2 && dropWritesForNode2.get()) ||
(to == 3 && dropWritesForNode3.get()))
// fix under replicated keyspaces so they don't produce hint requests while we are dropping mutations
cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}"));
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int PRIMARY KEY, v int)"));
ICoordinator coordinator = cluster.coordinator(1);
IInvokableInstance node1 = cluster.get(1);
IInvokableInstance node2 = cluster.get(2);
IInvokableInstance node3 = cluster.get(3);
// write the first half of the rows with the second node dropping mutation requests,
// so some hints will be created for that node
for (int i = 0; i < NUM_ROWS / 2; i++)
coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (?, ?)"), QUORUM, i, i);
// write the second half of the rows with the third node dropping mutations requests,
// so some hints will be created for that node
for (int i = NUM_ROWS / 2; i < NUM_ROWS; i++)
coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (?, ?)"), QUORUM, i, i);
// wait until all the hints have been successfully applied to the nodes that have been dropping mutations
waitUntilAsserted(() -> assertThat(countRows(node2)).isEqualTo(countRows(node3)).isEqualTo(NUM_ROWS));
// Verify the metrics for the coordinator node, which is the only one actually sending hints.
// The hint delivery errors that we have injected should have made the service try to send them again.
// These retries are done periodically and in pages, so the retries may send again some of the hints that
// were already successfully sent. This way, there may be more succeeded hints than actual hints/rows.
waitUntilAsserted(() -> assertThat(countHintsSucceeded(node1)).isGreaterThanOrEqualTo(NUM_ROWS));
waitUntilAsserted(() -> assertThat(countHintsFailed(node1)).isEqualTo(NUM_FAILURES_PER_NODE * 2));
waitUntilAsserted(() -> assertThat(countHintsTimedOut(node1)).isEqualTo(NUM_TIMEOUTS_PER_NODE * 2));
// verify delay metrics
long numGlobalDelays = countGlobalDelays(node1);
assertThat(countEndpointDelays(node1, node1)).isEqualTo(0);
assertThat(countEndpointDelays(node1, node2)).isGreaterThan(0).isLessThanOrEqualTo(numGlobalDelays);
assertThat(countEndpointDelays(node1, node3)).isGreaterThan(0).isLessThanOrEqualTo(numGlobalDelays);
assertThat(countEndpointDelays(node1, node2) + countEndpointDelays(node1, node3)).isGreaterThanOrEqualTo(numGlobalDelays);
// verify that the metrics for the not-coordinator nodes are zero
for (IInvokableInstance node : Arrays.asList(node2, node3))
cluster.forEach(target -> assertThat(countEndpointDelays(node, target)).isEqualTo(0));
private static void waitUntilAsserted(ThrowingRunnable assertion)
await().atMost(5, MINUTES)
.pollDelay(0, SECONDS)
.pollInterval(1, SECONDS)
private static int countRows(IInvokableInstance node)
return node.executeInternal(withKeyspace("SELECT * FROM %s.t")).length;
private static Long countHintsSucceeded(IInvokableInstance node)
return node.callOnInstance(() -> HintsServiceMetrics.hintsSucceeded.getCount());
private static Long countHintsFailed(IInvokableInstance node)
return node.callOnInstance(() -> HintsServiceMetrics.hintsFailed.getCount());
private static Long countHintsTimedOut(IInvokableInstance node)
return node.callOnInstance(() -> HintsServiceMetrics.hintsTimedOut.getCount());
private static Long countGlobalDelays(IInvokableInstance node)
return getHistogramCount(node, "org.apache.cassandra.metrics.HintsService.Hint_delays");
private static Long countEndpointDelays(IInvokableInstance node, IInvokableInstance target)
return getHistogramCount(node, String.format("org.apache.cassandra.metrics.HintsService.Hint_delays-%s.%d",
private static long getHistogramCount(IInvokableInstance node, String name)
return node.metrics()
.getHistograms(s -> s.equals(name), Metrics.MetricValue.COUNT)
* Bytebuddy injection to make the application of hints to fail on the destination node.
public static class FailHints
private static final AtomicInteger numHints = new AtomicInteger(0);
private static void install(ClassLoader cl, int nodeNumber)
// we can ignore the coordinator node
if (nodeNumber == 1)
new ByteBuddy().rebase(Hint.class)
.load(cl, ClassLoadingStrategy.Default.INJECTION);
public static Future<?> execute(@SuperCall Callable<Future<?>> r) throws Exception
if (numHints.incrementAndGet() <= NUM_FAILURES_PER_NODE)
throw new RuntimeException("Injected failure");