blob: b0d7debe44a4f19e1c158b8be29d5bb8ce1df33f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.lib;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.apache.hadoop.test.MetricsAsserts.*;
import static org.mockito.AdditionalMatchers.eq;
import static org.mockito.AdditionalMatchers.geq;
import static org.mockito.AdditionalMatchers.leq;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.junit.Assert.*;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.util.Quantile;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test metrics record builder interface and mutable metrics
*/
public class TestMutableMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(TestMutableMetrics.class);
private final double EPSILON = 1e-42;
/**
* Test the snapshot method
*/
@Test public void testSnapshot() {
MetricsRecordBuilder mb = mockMetricsRecordBuilder();
MetricsRegistry registry = new MetricsRegistry("test");
registry.newCounter("c1", "int counter", 1);
registry.newCounter("c2", "long counter", 2L);
registry.newGauge("g1", "int gauge", 3);
registry.newGauge("g2", "long gauge", 4L);
registry.newGauge("g3", "float gauge", 5f);
registry.newStat("s1", "stat", "Ops", "Time", true).add(0);
registry.newRate("s2", "stat", false).add(0);
registry.snapshot(mb, true);
MutableStat s2 = (MutableStat) registry.get("s2");
s2.snapshot(mb, true); // should get the same back.
s2.add(1);
s2.snapshot(mb, true); // should get new interval values back
verify(mb).addCounter(info("c1", "int counter"), 1);
verify(mb).addCounter(info("c2", "long counter"), 2L);
verify(mb).addGauge(info("g1", "int gauge"), 3);
verify(mb).addGauge(info("g2", "long gauge"), 4L);
verify(mb).addGauge(info("g3", "float gauge"), 5f);
verify(mb).addCounter(info("S1NumOps", "Number of ops for stat"), 1L);
verify(mb).addGauge(eq(info("S1AvgTime", "Average time for stat")),
eq(0.0, EPSILON));
verify(mb).addGauge(eq(info("S1StdevTime",
"Standard deviation of time for stat")),
eq(0.0, EPSILON));
verify(mb).addGauge(eq(info("S1IMinTime",
"Interval min time for stat")),
eq(0.0, EPSILON));
verify(mb).addGauge(eq(info("S1IMaxTime",
"Interval max time for stat")),
eq(0.0, EPSILON));
verify(mb).addGauge(eq(info("S1MinTime","Min time for stat")),
eq(0.0, EPSILON));
verify(mb).addGauge(eq(info("S1MaxTime","Max time for stat")),
eq(0.0, EPSILON));
verify(mb).addGauge(
eq(info("S1INumOps", "Interval number of ops for stat")),
eq(1L));
verify(mb, times(2))
.addCounter(info("S2NumOps", "Number of ops for stat"), 1L);
verify(mb, times(2)).addGauge(eq(info("S2AvgTime",
"Average time for stat")),
eq(0.0, EPSILON));
verify(mb).addCounter(info("S2NumOps", "Number of ops for stat"), 2L);
verify(mb).addGauge(eq(info("S2AvgTime", "Average time for stat")),
eq(1.0, EPSILON));
// Add one more sample to s1 and verify that total number of ops
// has increased to 2, but interval number is 1 for both intervals.
MutableStat s1 = (MutableStat) registry.get("s1");
s1.add(0);
registry.snapshot(mb, true);
verify(mb).addCounter(info("S1NumOps", "Number of ops for stat"), 2L);
verify(mb, times(2)).addGauge(
eq(info("S1INumOps", "Interval number of ops for stat")),
eq(1L));
}
interface TestProtocol {
void foo();
void bar();
}
@Test public void testMutableRates() {
MetricsRecordBuilder rb = mockMetricsRecordBuilder();
MetricsRegistry registry = new MetricsRegistry("test");
MutableRates rates = new MutableRates(registry);
rates.init(TestProtocol.class);
registry.snapshot(rb, false);
assertCounter("FooNumOps", 0L, rb);
assertGauge("FooAvgTime", 0.0, rb);
assertCounter("BarNumOps", 0L, rb);
assertGauge("BarAvgTime", 0.0, rb);
}
@Test public void testMutableRatesWithAggregationInit() {
MetricsRecordBuilder rb = mockMetricsRecordBuilder();
MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
rates.init(TestProtocol.class);
rates.snapshot(rb, false);
assertCounter("FooNumOps", 0L, rb);
assertGauge("FooAvgTime", 0.0, rb);
assertCounter("BarNumOps", 0L, rb);
assertGauge("BarAvgTime", 0.0, rb);
}
@Test public void testMutableRatesWithAggregationSingleThread() {
MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
rates.add("foo", 1);
rates.add("bar", 5);
MetricsRecordBuilder rb = mockMetricsRecordBuilder();
rates.snapshot(rb, false);
assertCounter("FooNumOps", 1L, rb);
assertGauge("FooAvgTime", 1.0, rb);
assertCounter("BarNumOps", 1L, rb);
assertGauge("BarAvgTime", 5.0, rb);
rates.add("foo", 1);
rates.add("foo", 3);
rates.add("bar", 6);
rb = mockMetricsRecordBuilder();
rates.snapshot(rb, false);
assertCounter("FooNumOps", 3L, rb);
assertGauge("FooAvgTime", 2.0, rb);
assertCounter("BarNumOps", 2L, rb);
assertGauge("BarAvgTime", 6.0, rb);
}
@Test public void testMutableRatesWithAggregationManyThreads()
throws InterruptedException {
final MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
final int n = 10;
long[] opCount = new long[n];
double[] opTotalTime = new double[n];
for (int i = 0; i < n; i++) {
opCount[i] = 0;
opTotalTime[i] = 0;
// Initialize so that the getLongCounter() method doesn't complain
rates.add("metric" + i, 0);
}
Thread[] threads = new Thread[n];
final CountDownLatch firstAddsFinished = new CountDownLatch(threads.length);
final CountDownLatch firstSnapshotsFinished = new CountDownLatch(1);
final CountDownLatch secondAddsFinished =
new CountDownLatch(threads.length);
final CountDownLatch secondSnapshotsFinished = new CountDownLatch(1);
long seed = new Random().nextLong();
LOG.info("Random seed = " + seed);
final Random sleepRandom = new Random(seed);
for (int tIdx = 0; tIdx < threads.length; tIdx++) {
final int threadIdx = tIdx;
threads[threadIdx] = new Thread() {
@Override
public void run() {
try {
for (int i = 0; i < 1000; i++) {
rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2);
// Sleep so additions can be interleaved with snapshots
Thread.sleep(sleepRandom.nextInt(5));
}
firstAddsFinished.countDown();
// Make sure all threads stay alive long enough for the first
// snapshot to complete; else their metrics may be lost to GC
firstSnapshotsFinished.await();
// Let half the threads continue with more metrics and let half die
if (threadIdx % 2 == 0) {
for (int i = 0; i < 1000; i++) {
rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2);
}
secondAddsFinished.countDown();
secondSnapshotsFinished.await();
} else {
secondAddsFinished.countDown();
}
} catch (InterruptedException e) {
// Ignore
}
}
};
}
for (Thread t : threads) {
t.start();
}
// Snapshot concurrently with additions but aggregate the totals into
// opCount / opTotalTime
for (int i = 0; i < 100; i++) {
snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
Thread.sleep(sleepRandom.nextInt(20));
}
firstAddsFinished.await();
// Final snapshot to grab any remaining metrics and then verify that
// the totals are as expected
snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
for (int i = 0; i < n; i++) {
assertEquals("metric" + i + " count", 1001, opCount[i]);
assertEquals("metric" + i + " total", 1500, opTotalTime[i], 1.0);
}
firstSnapshotsFinished.countDown();
// After half of the threads die, ensure that the remaining ones still
// add metrics correctly and that snapshot occurs correctly
secondAddsFinished.await();
snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
for (int i = 0; i < n; i++) {
assertEquals("metric" + i + " count", 1501, opCount[i]);
assertEquals("metric" + i + " total", 2250, opTotalTime[i], 1.0);
}
secondSnapshotsFinished.countDown();
}
private static void snapshotMutableRatesWithAggregation(
MutableRatesWithAggregation rates, long[] opCount, double[] opTotalTime) {
MetricsRecordBuilder rb = mockMetricsRecordBuilder();
rates.snapshot(rb, true);
for (int i = 0; i < opCount.length; i++) {
long prevOpCount = opCount[i];
long newOpCount = getLongCounter("Metric" + i + "NumOps", rb);
opCount[i] = newOpCount;
double avgTime = getDoubleGauge("Metric" + i + "AvgTime", rb);
opTotalTime[i] += avgTime * (newOpCount - prevOpCount);
}
}
/**
* Tests that when using {@link MutableStat#add(long, long)}, even with a high
* sample count, the mean does not lose accuracy.
*/
@Test public void testMutableStatWithBulkAdd() {
MetricsRecordBuilder rb = mockMetricsRecordBuilder();
MetricsRegistry registry = new MetricsRegistry("test");
MutableStat stat = registry.newStat("Test", "Test", "Ops", "Val", false);
stat.add(1000, 1000);
stat.add(1000, 2000);
registry.snapshot(rb, false);
assertCounter("TestNumOps", 2000L, rb);
assertGauge("TestAvgVal", 1.5, rb);
}
/**
* Ensure that quantile estimates from {@link MutableQuantiles} are within
* specified error bounds.
*/
@Test(timeout = 30000)
public void testMutableQuantilesError() throws Exception {
MetricsRecordBuilder mb = mockMetricsRecordBuilder();
MetricsRegistry registry = new MetricsRegistry("test");
// Use a 5s rollover period
MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops",
"Latency", 5);
// Push some values in and wait for it to publish
long start = System.nanoTime() / 1000000;
for (long i = 1; i <= 1000; i++) {
quantiles.add(i);
quantiles.add(1001 - i);
}
long end = System.nanoTime() / 1000000;
Thread.sleep(6000 - (end - start));
registry.snapshot(mb, false);
// Print out the snapshot
Map<Quantile, Long> previousSnapshot = quantiles.previousSnapshot;
for (Entry<Quantile, Long> item : previousSnapshot.entrySet()) {
System.out.println(String.format("Quantile %.2f has value %d",
item.getKey().quantile, item.getValue()));
}
// Verify the results are within our requirements
verify(mb).addGauge(
info("FooNumOps", "Number of ops for stat with 5s interval"),
(long) 2000);
Quantile[] quants = MutableQuantiles.quantiles;
String name = "Foo%dthPercentileLatency";
String desc = "%d percentile latency with 5 second interval for stat";
for (Quantile q : quants) {
int percentile = (int) (100 * q.quantile);
int error = (int) (1000 * q.error);
String n = String.format(name, percentile);
String d = String.format(desc, percentile);
long expected = (long) (q.quantile * 1000);
verify(mb).addGauge(eq(info(n, d)), leq(expected + error));
verify(mb).addGauge(eq(info(n, d)), geq(expected - error));
}
}
/**
* Test that {@link MutableQuantiles} rolls the window over at the specified
* interval.
*/
@Test(timeout = 30000)
public void testMutableQuantilesRollover() throws Exception {
MetricsRecordBuilder mb = mockMetricsRecordBuilder();
MetricsRegistry registry = new MetricsRegistry("test");
// Use a 5s rollover period
MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops",
"Latency", 5);
Quantile[] quants = MutableQuantiles.quantiles;
String name = "Foo%dthPercentileLatency";
String desc = "%d percentile latency with 5 second interval for stat";
// Push values for three intervals
long start = System.nanoTime() / 1000000;
for (int i = 1; i <= 3; i++) {
// Insert the values
for (long j = 1; j <= 1000; j++) {
quantiles.add(i);
}
// Sleep until 1s after the next 5s interval, to let the metrics
// roll over
long sleep = (start + (5000 * i) + 1000) - (System.nanoTime() / 1000000);
Thread.sleep(sleep);
// Verify that the window reset, check it has the values we pushed in
registry.snapshot(mb, false);
for (Quantile q : quants) {
int percentile = (int) (100 * q.quantile);
String n = String.format(name, percentile);
String d = String.format(desc, percentile);
verify(mb).addGauge(info(n, d), (long) i);
}
}
// Verify the metrics were added the right number of times
verify(mb, times(3)).addGauge(
info("FooNumOps", "Number of ops for stat with 5s interval"),
(long) 1000);
for (Quantile q : quants) {
int percentile = (int) (100 * q.quantile);
String n = String.format(name, percentile);
String d = String.format(desc, percentile);
verify(mb, times(3)).addGauge(eq(info(n, d)), anyLong());
}
}
/**
* Test that {@link MutableQuantiles} rolls over correctly even if no items
* have been added to the window
*/
@Test(timeout = 30000)
public void testMutableQuantilesEmptyRollover() throws Exception {
MetricsRecordBuilder mb = mockMetricsRecordBuilder();
MetricsRegistry registry = new MetricsRegistry("test");
// Use a 5s rollover period
MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops",
"Latency", 5);
// Check it initially
quantiles.snapshot(mb, true);
verify(mb).addGauge(
info("FooNumOps", "Number of ops for stat with 5s interval"), (long) 0);
Thread.sleep(6000);
quantiles.snapshot(mb, false);
verify(mb, times(2)).addGauge(
info("FooNumOps", "Number of ops for stat with 5s interval"), (long) 0);
}
}