| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.metrics2.lib; |
| |
| import static org.apache.hadoop.metrics2.lib.Interns.info; |
| import static org.apache.hadoop.test.MetricsAsserts.*; |
| import static org.mockito.AdditionalMatchers.eq; |
| import static org.mockito.AdditionalMatchers.geq; |
| import static org.mockito.AdditionalMatchers.leq; |
| import static org.mockito.Matchers.anyLong; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.junit.Assert.*; |
| |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| import java.util.concurrent.CountDownLatch; |
| |
| import org.apache.hadoop.metrics2.MetricsRecordBuilder; |
| import org.apache.hadoop.metrics2.util.Quantile; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Test metrics record builder interface and mutable metrics |
| */ |
| public class TestMutableMetrics { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestMutableMetrics.class); |
| private final double EPSILON = 1e-42; |
| |
| /** |
| * Test the snapshot method |
| */ |
| @Test public void testSnapshot() { |
| MetricsRecordBuilder mb = mockMetricsRecordBuilder(); |
| |
| MetricsRegistry registry = new MetricsRegistry("test"); |
| registry.newCounter("c1", "int counter", 1); |
| registry.newCounter("c2", "long counter", 2L); |
| registry.newGauge("g1", "int gauge", 3); |
| registry.newGauge("g2", "long gauge", 4L); |
| registry.newGauge("g3", "float gauge", 5f); |
| registry.newStat("s1", "stat", "Ops", "Time", true).add(0); |
| registry.newRate("s2", "stat", false).add(0); |
| |
| registry.snapshot(mb, true); |
| |
| MutableStat s2 = (MutableStat) registry.get("s2"); |
| |
| s2.snapshot(mb, true); // should get the same back. |
| s2.add(1); |
| s2.snapshot(mb, true); // should get new interval values back |
| |
| verify(mb).addCounter(info("c1", "int counter"), 1); |
| verify(mb).addCounter(info("c2", "long counter"), 2L); |
| verify(mb).addGauge(info("g1", "int gauge"), 3); |
| verify(mb).addGauge(info("g2", "long gauge"), 4L); |
| verify(mb).addGauge(info("g3", "float gauge"), 5f); |
| verify(mb).addCounter(info("S1NumOps", "Number of ops for stat"), 1L); |
| verify(mb).addGauge(eq(info("S1AvgTime", "Average time for stat")), |
| eq(0.0, EPSILON)); |
| verify(mb).addGauge(eq(info("S1StdevTime", |
| "Standard deviation of time for stat")), |
| eq(0.0, EPSILON)); |
| verify(mb).addGauge(eq(info("S1IMinTime", |
| "Interval min time for stat")), |
| eq(0.0, EPSILON)); |
| verify(mb).addGauge(eq(info("S1IMaxTime", |
| "Interval max time for stat")), |
| eq(0.0, EPSILON)); |
| verify(mb).addGauge(eq(info("S1MinTime","Min time for stat")), |
| eq(0.0, EPSILON)); |
| verify(mb).addGauge(eq(info("S1MaxTime","Max time for stat")), |
| eq(0.0, EPSILON)); |
| verify(mb).addGauge( |
| eq(info("S1INumOps", "Interval number of ops for stat")), |
| eq(1L)); |
| |
| verify(mb, times(2)) |
| .addCounter(info("S2NumOps", "Number of ops for stat"), 1L); |
| verify(mb, times(2)).addGauge(eq(info("S2AvgTime", |
| "Average time for stat")), |
| eq(0.0, EPSILON)); |
| verify(mb).addCounter(info("S2NumOps", "Number of ops for stat"), 2L); |
| verify(mb).addGauge(eq(info("S2AvgTime", "Average time for stat")), |
| eq(1.0, EPSILON)); |
| |
| // Add one more sample to s1 and verify that total number of ops |
| // has increased to 2, but interval number is 1 for both intervals. |
| MutableStat s1 = (MutableStat) registry.get("s1"); |
| s1.add(0); |
| registry.snapshot(mb, true); |
| verify(mb).addCounter(info("S1NumOps", "Number of ops for stat"), 2L); |
| verify(mb, times(2)).addGauge( |
| eq(info("S1INumOps", "Interval number of ops for stat")), |
| eq(1L)); |
| } |
| |
| interface TestProtocol { |
| void foo(); |
| void bar(); |
| } |
| |
| @Test public void testMutableRates() { |
| MetricsRecordBuilder rb = mockMetricsRecordBuilder(); |
| MetricsRegistry registry = new MetricsRegistry("test"); |
| MutableRates rates = new MutableRates(registry); |
| |
| rates.init(TestProtocol.class); |
| registry.snapshot(rb, false); |
| |
| assertCounter("FooNumOps", 0L, rb); |
| assertGauge("FooAvgTime", 0.0, rb); |
| assertCounter("BarNumOps", 0L, rb); |
| assertGauge("BarAvgTime", 0.0, rb); |
| } |
| |
| @Test public void testMutableRatesWithAggregationInit() { |
| MetricsRecordBuilder rb = mockMetricsRecordBuilder(); |
| MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); |
| |
| rates.init(TestProtocol.class); |
| rates.snapshot(rb, false); |
| |
| assertCounter("FooNumOps", 0L, rb); |
| assertGauge("FooAvgTime", 0.0, rb); |
| assertCounter("BarNumOps", 0L, rb); |
| assertGauge("BarAvgTime", 0.0, rb); |
| } |
| |
| @Test public void testMutableRatesWithAggregationSingleThread() { |
| MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); |
| |
| rates.add("foo", 1); |
| rates.add("bar", 5); |
| |
| MetricsRecordBuilder rb = mockMetricsRecordBuilder(); |
| rates.snapshot(rb, false); |
| assertCounter("FooNumOps", 1L, rb); |
| assertGauge("FooAvgTime", 1.0, rb); |
| assertCounter("BarNumOps", 1L, rb); |
| assertGauge("BarAvgTime", 5.0, rb); |
| |
| rates.add("foo", 1); |
| rates.add("foo", 3); |
| rates.add("bar", 6); |
| |
| rb = mockMetricsRecordBuilder(); |
| rates.snapshot(rb, false); |
| assertCounter("FooNumOps", 3L, rb); |
| assertGauge("FooAvgTime", 2.0, rb); |
| assertCounter("BarNumOps", 2L, rb); |
| assertGauge("BarAvgTime", 6.0, rb); |
| } |
| |
| @Test public void testMutableRatesWithAggregationManyThreads() |
| throws InterruptedException { |
| final MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); |
| |
| final int n = 10; |
| long[] opCount = new long[n]; |
| double[] opTotalTime = new double[n]; |
| |
| for (int i = 0; i < n; i++) { |
| opCount[i] = 0; |
| opTotalTime[i] = 0; |
| // Initialize so that the getLongCounter() method doesn't complain |
| rates.add("metric" + i, 0); |
| } |
| |
| Thread[] threads = new Thread[n]; |
| final CountDownLatch firstAddsFinished = new CountDownLatch(threads.length); |
| final CountDownLatch firstSnapshotsFinished = new CountDownLatch(1); |
| final CountDownLatch secondAddsFinished = |
| new CountDownLatch(threads.length); |
| final CountDownLatch secondSnapshotsFinished = new CountDownLatch(1); |
| long seed = new Random().nextLong(); |
| LOG.info("Random seed = " + seed); |
| final Random sleepRandom = new Random(seed); |
| for (int tIdx = 0; tIdx < threads.length; tIdx++) { |
| final int threadIdx = tIdx; |
| threads[threadIdx] = new Thread() { |
| @Override |
| public void run() { |
| try { |
| for (int i = 0; i < 1000; i++) { |
| rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2); |
| // Sleep so additions can be interleaved with snapshots |
| Thread.sleep(sleepRandom.nextInt(5)); |
| } |
| firstAddsFinished.countDown(); |
| |
| // Make sure all threads stay alive long enough for the first |
| // snapshot to complete; else their metrics may be lost to GC |
| firstSnapshotsFinished.await(); |
| |
| // Let half the threads continue with more metrics and let half die |
| if (threadIdx % 2 == 0) { |
| for (int i = 0; i < 1000; i++) { |
| rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2); |
| } |
| secondAddsFinished.countDown(); |
| secondSnapshotsFinished.await(); |
| } else { |
| secondAddsFinished.countDown(); |
| } |
| } catch (InterruptedException e) { |
| // Ignore |
| } |
| } |
| }; |
| } |
| for (Thread t : threads) { |
| t.start(); |
| } |
| // Snapshot concurrently with additions but aggregate the totals into |
| // opCount / opTotalTime |
| for (int i = 0; i < 100; i++) { |
| snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime); |
| Thread.sleep(sleepRandom.nextInt(20)); |
| } |
| firstAddsFinished.await(); |
| // Final snapshot to grab any remaining metrics and then verify that |
| // the totals are as expected |
| snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime); |
| for (int i = 0; i < n; i++) { |
| assertEquals("metric" + i + " count", 1001, opCount[i]); |
| assertEquals("metric" + i + " total", 1500, opTotalTime[i], 1.0); |
| } |
| firstSnapshotsFinished.countDown(); |
| |
| // After half of the threads die, ensure that the remaining ones still |
| // add metrics correctly and that snapshot occurs correctly |
| secondAddsFinished.await(); |
| snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime); |
| for (int i = 0; i < n; i++) { |
| assertEquals("metric" + i + " count", 1501, opCount[i]); |
| assertEquals("metric" + i + " total", 2250, opTotalTime[i], 1.0); |
| } |
| secondSnapshotsFinished.countDown(); |
| } |
| |
| private static void snapshotMutableRatesWithAggregation( |
| MutableRatesWithAggregation rates, long[] opCount, double[] opTotalTime) { |
| MetricsRecordBuilder rb = mockMetricsRecordBuilder(); |
| rates.snapshot(rb, true); |
| for (int i = 0; i < opCount.length; i++) { |
| long prevOpCount = opCount[i]; |
| long newOpCount = getLongCounter("Metric" + i + "NumOps", rb); |
| opCount[i] = newOpCount; |
| double avgTime = getDoubleGauge("Metric" + i + "AvgTime", rb); |
| opTotalTime[i] += avgTime * (newOpCount - prevOpCount); |
| } |
| } |
| |
| /** |
| * Tests that when using {@link MutableStat#add(long, long)}, even with a high |
| * sample count, the mean does not lose accuracy. |
| */ |
| @Test public void testMutableStatWithBulkAdd() { |
| MetricsRecordBuilder rb = mockMetricsRecordBuilder(); |
| MetricsRegistry registry = new MetricsRegistry("test"); |
| MutableStat stat = registry.newStat("Test", "Test", "Ops", "Val", false); |
| |
| stat.add(1000, 1000); |
| stat.add(1000, 2000); |
| registry.snapshot(rb, false); |
| |
| assertCounter("TestNumOps", 2000L, rb); |
| assertGauge("TestAvgVal", 1.5, rb); |
| } |
| |
| /** |
| * Ensure that quantile estimates from {@link MutableQuantiles} are within |
| * specified error bounds. |
| */ |
| @Test(timeout = 30000) |
| public void testMutableQuantilesError() throws Exception { |
| MetricsRecordBuilder mb = mockMetricsRecordBuilder(); |
| MetricsRegistry registry = new MetricsRegistry("test"); |
| // Use a 5s rollover period |
| MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops", |
| "Latency", 5); |
| // Push some values in and wait for it to publish |
| long start = System.nanoTime() / 1000000; |
| for (long i = 1; i <= 1000; i++) { |
| quantiles.add(i); |
| quantiles.add(1001 - i); |
| } |
| long end = System.nanoTime() / 1000000; |
| |
| Thread.sleep(6000 - (end - start)); |
| |
| registry.snapshot(mb, false); |
| |
| // Print out the snapshot |
| Map<Quantile, Long> previousSnapshot = quantiles.previousSnapshot; |
| for (Entry<Quantile, Long> item : previousSnapshot.entrySet()) { |
| System.out.println(String.format("Quantile %.2f has value %d", |
| item.getKey().quantile, item.getValue())); |
| } |
| |
| // Verify the results are within our requirements |
| verify(mb).addGauge( |
| info("FooNumOps", "Number of ops for stat with 5s interval"), |
| (long) 2000); |
| Quantile[] quants = MutableQuantiles.quantiles; |
| String name = "Foo%dthPercentileLatency"; |
| String desc = "%d percentile latency with 5 second interval for stat"; |
| for (Quantile q : quants) { |
| int percentile = (int) (100 * q.quantile); |
| int error = (int) (1000 * q.error); |
| String n = String.format(name, percentile); |
| String d = String.format(desc, percentile); |
| long expected = (long) (q.quantile * 1000); |
| verify(mb).addGauge(eq(info(n, d)), leq(expected + error)); |
| verify(mb).addGauge(eq(info(n, d)), geq(expected - error)); |
| } |
| } |
| |
| /** |
| * Test that {@link MutableQuantiles} rolls the window over at the specified |
| * interval. |
| */ |
| @Test(timeout = 30000) |
| public void testMutableQuantilesRollover() throws Exception { |
| MetricsRecordBuilder mb = mockMetricsRecordBuilder(); |
| MetricsRegistry registry = new MetricsRegistry("test"); |
| // Use a 5s rollover period |
| MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops", |
| "Latency", 5); |
| |
| Quantile[] quants = MutableQuantiles.quantiles; |
| String name = "Foo%dthPercentileLatency"; |
| String desc = "%d percentile latency with 5 second interval for stat"; |
| |
| // Push values for three intervals |
| long start = System.nanoTime() / 1000000; |
| for (int i = 1; i <= 3; i++) { |
| // Insert the values |
| for (long j = 1; j <= 1000; j++) { |
| quantiles.add(i); |
| } |
| // Sleep until 1s after the next 5s interval, to let the metrics |
| // roll over |
| long sleep = (start + (5000 * i) + 1000) - (System.nanoTime() / 1000000); |
| Thread.sleep(sleep); |
| // Verify that the window reset, check it has the values we pushed in |
| registry.snapshot(mb, false); |
| for (Quantile q : quants) { |
| int percentile = (int) (100 * q.quantile); |
| String n = String.format(name, percentile); |
| String d = String.format(desc, percentile); |
| verify(mb).addGauge(info(n, d), (long) i); |
| } |
| } |
| |
| // Verify the metrics were added the right number of times |
| verify(mb, times(3)).addGauge( |
| info("FooNumOps", "Number of ops for stat with 5s interval"), |
| (long) 1000); |
| for (Quantile q : quants) { |
| int percentile = (int) (100 * q.quantile); |
| String n = String.format(name, percentile); |
| String d = String.format(desc, percentile); |
| verify(mb, times(3)).addGauge(eq(info(n, d)), anyLong()); |
| } |
| } |
| |
| /** |
| * Test that {@link MutableQuantiles} rolls over correctly even if no items |
| * have been added to the window |
| */ |
| @Test(timeout = 30000) |
| public void testMutableQuantilesEmptyRollover() throws Exception { |
| MetricsRecordBuilder mb = mockMetricsRecordBuilder(); |
| MetricsRegistry registry = new MetricsRegistry("test"); |
| // Use a 5s rollover period |
| MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops", |
| "Latency", 5); |
| |
| // Check it initially |
| quantiles.snapshot(mb, true); |
| verify(mb).addGauge( |
| info("FooNumOps", "Number of ops for stat with 5s interval"), (long) 0); |
| Thread.sleep(6000); |
| quantiles.snapshot(mb, false); |
| verify(mb, times(2)).addGauge( |
| info("FooNumOps", "Number of ops for stat with 5s interval"), (long) 0); |
| } |
| } |