| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.common.metrics; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Deque; |
| import java.util.List; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.ConcurrentLinkedDeque; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.kafka.common.Metric; |
| import org.apache.kafka.common.MetricName; |
| import org.apache.kafka.common.metrics.stats.Avg; |
| import org.apache.kafka.common.metrics.stats.Count; |
| import org.apache.kafka.common.metrics.stats.Max; |
| import org.apache.kafka.common.metrics.stats.Meter; |
| import org.apache.kafka.common.metrics.stats.Min; |
| import org.apache.kafka.common.metrics.stats.Percentile; |
| import org.apache.kafka.common.metrics.stats.Percentiles; |
| import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; |
| import org.apache.kafka.common.metrics.stats.Rate; |
| import org.apache.kafka.common.metrics.stats.SimpleRate; |
| import org.apache.kafka.common.metrics.stats.Sum; |
| import org.apache.kafka.common.metrics.stats.Total; |
| import org.apache.kafka.common.metrics.stats.Value; |
| import org.apache.kafka.common.utils.MockTime; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @SuppressWarnings("deprecation") |
| public class MetricsTest { |
| private static final Logger log = LoggerFactory.getLogger(MetricsTest.class); |
| |
| private static final double EPS = 0.000001; |
| private MockTime time = new MockTime(); |
| private MetricConfig config = new MetricConfig(); |
| private Metrics metrics; |
| private ExecutorService executorService; |
| |
| @Before |
| public void setup() { |
| this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if (executorService != null) { |
| executorService.shutdownNow(); |
| executorService.awaitTermination(5, TimeUnit.SECONDS); |
| } |
| this.metrics.close(); |
| } |
| |
| @Test |
| public void testMetricName() { |
| MetricName n1 = metrics.metricName("name", "group", "description", "key1", "value1", "key2", "value2"); |
| Map<String, String> tags = new HashMap<String, String>(); |
| tags.put("key1", "value1"); |
| tags.put("key2", "value2"); |
| MetricName n2 = metrics.metricName("name", "group", "description", tags); |
| assertEquals("metric names created in two different ways should be equal", n1, n2); |
| |
| try { |
| metrics.metricName("name", "group", "description", "key1"); |
| fail("Creating MetricName with an odd number of keyValue should fail"); |
| } catch (IllegalArgumentException e) { |
| // this is expected |
| } |
| } |
| |
| @Test |
| public void testSimpleStats() throws Exception { |
| ConstantMeasurable measurable = new ConstantMeasurable(); |
| |
| metrics.addMetric(metrics.metricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable); |
| Sensor s = metrics.sensor("test.sensor"); |
| s.add(metrics.metricName("test.avg", "grp1"), new Avg()); |
| s.add(metrics.metricName("test.max", "grp1"), new Max()); |
| s.add(metrics.metricName("test.min", "grp1"), new Min()); |
| s.add(new Meter(TimeUnit.SECONDS, metrics.metricName("test.rate", "grp1"), |
| metrics.metricName("test.total", "grp1"))); |
| s.add(new Meter(TimeUnit.SECONDS, new Count(), metrics.metricName("test.occurences", "grp1"), |
| metrics.metricName("test.occurences.total", "grp1"))); |
| s.add(metrics.metricName("test.count", "grp1"), new Count()); |
| s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT, |
| new Percentile(metrics.metricName("test.median", "grp1"), 50.0), |
| new Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9))); |
| |
| Sensor s2 = metrics.sensor("test.sensor2"); |
| s2.add(metrics.metricName("s2.total", "grp1"), new Total()); |
| s2.record(5.0); |
| |
| int sum = 0; |
| int count = 10; |
| for (int i = 0; i < count; i++) { |
| s.record(i); |
| sum += i; |
| } |
| // prior to any time passing |
| double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0; |
| assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs, |
| metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS); |
| |
| // pretend 2 seconds passed... |
| long sleepTimeMs = 2; |
| time.sleep(sleepTimeMs * 1000); |
| elapsedSecs += sleepTimeMs; |
| |
| assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(metrics.metricName("s2.total", "grp1")).value(), EPS); |
| assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(metrics.metricName("test.avg", "grp1")).value(), EPS); |
| assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(metrics.metricName("test.max", "grp1")).value(), EPS); |
| assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(metrics.metricName("test.min", "grp1")).value(), EPS); |
| assertEquals("Rate(0...9) = 1.40625", |
| sum / elapsedSecs, metrics.metrics().get(metrics.metricName("test.rate", "grp1")).value(), EPS); |
| assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), |
| count / elapsedSecs, |
| metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS); |
| assertEquals("Count(0...9) = 10", |
| (double) count, metrics.metrics().get(metrics.metricName("test.count", "grp1")).value(), EPS); |
| } |
| |
| @Test |
| public void testHierarchicalSensors() { |
| Sensor parent1 = metrics.sensor("test.parent1"); |
| parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count()); |
| Sensor parent2 = metrics.sensor("test.parent2"); |
| parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count()); |
| Sensor child1 = metrics.sensor("test.child1", parent1, parent2); |
| child1.add(metrics.metricName("test.child1.count", "grp1"), new Count()); |
| Sensor child2 = metrics.sensor("test.child2", parent1); |
| child2.add(metrics.metricName("test.child2.count", "grp1"), new Count()); |
| Sensor grandchild = metrics.sensor("test.grandchild", child1); |
| grandchild.add(metrics.metricName("test.grandchild.count", "grp1"), new Count()); |
| |
| /* increment each sensor one time */ |
| parent1.record(); |
| parent2.record(); |
| child1.record(); |
| child2.record(); |
| grandchild.record(); |
| |
| double p1 = parent1.metrics().get(0).value(); |
| double p2 = parent2.metrics().get(0).value(); |
| double c1 = child1.metrics().get(0).value(); |
| double c2 = child2.metrics().get(0).value(); |
| double gc = grandchild.metrics().get(0).value(); |
| |
| /* each metric should have a count equal to one + its children's count */ |
| assertEquals(1.0, gc, EPS); |
| assertEquals(1.0 + gc, c1, EPS); |
| assertEquals(1.0, c2, EPS); |
| assertEquals(1.0 + c1, p2, EPS); |
| assertEquals(1.0 + c1 + c2, p1, EPS); |
| assertEquals(Arrays.asList(child1, child2), metrics.childrenSensors().get(parent1)); |
| assertEquals(Arrays.asList(child1), metrics.childrenSensors().get(parent2)); |
| assertNull(metrics.childrenSensors().get(grandchild)); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testBadSensorHierarchy() { |
| Sensor p = metrics.sensor("parent"); |
| Sensor c1 = metrics.sensor("child1", p); |
| Sensor c2 = metrics.sensor("child2", p); |
| metrics.sensor("gc", c1, c2); // should fail |
| } |
| |
| @Test |
| public void testRemoveSensor() { |
| int size = metrics.metrics().size(); |
| Sensor parent1 = metrics.sensor("test.parent1"); |
| parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count()); |
| Sensor parent2 = metrics.sensor("test.parent2"); |
| parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count()); |
| Sensor child1 = metrics.sensor("test.child1", parent1, parent2); |
| child1.add(metrics.metricName("test.child1.count", "grp1"), new Count()); |
| Sensor child2 = metrics.sensor("test.child2", parent2); |
| child2.add(metrics.metricName("test.child2.count", "grp1"), new Count()); |
| Sensor grandChild1 = metrics.sensor("test.gchild2", child2); |
| grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new Count()); |
| |
| Sensor sensor = metrics.getSensor("test.parent1"); |
| assertNotNull(sensor); |
| metrics.removeSensor("test.parent1"); |
| assertNull(metrics.getSensor("test.parent1")); |
| assertNull(metrics.metrics().get(metrics.metricName("test.parent1.count", "grp1"))); |
| assertNull(metrics.getSensor("test.child1")); |
| assertNull(metrics.childrenSensors().get(sensor)); |
| assertNull(metrics.metrics().get(metrics.metricName("test.child1.count", "grp1"))); |
| |
| sensor = metrics.getSensor("test.gchild2"); |
| assertNotNull(sensor); |
| metrics.removeSensor("test.gchild2"); |
| assertNull(metrics.getSensor("test.gchild2")); |
| assertNull(metrics.childrenSensors().get(sensor)); |
| assertNull(metrics.metrics().get(metrics.metricName("test.gchild2.count", "grp1"))); |
| |
| sensor = metrics.getSensor("test.child2"); |
| assertNotNull(sensor); |
| metrics.removeSensor("test.child2"); |
| assertNull(metrics.getSensor("test.child2")); |
| assertNull(metrics.childrenSensors().get(sensor)); |
| assertNull(metrics.metrics().get(metrics.metricName("test.child2.count", "grp1"))); |
| |
| sensor = metrics.getSensor("test.parent2"); |
| assertNotNull(sensor); |
| metrics.removeSensor("test.parent2"); |
| assertNull(metrics.getSensor("test.parent2")); |
| assertNull(metrics.childrenSensors().get(sensor)); |
| assertNull(metrics.metrics().get(metrics.metricName("test.parent2.count", "grp1"))); |
| |
| assertEquals(size, metrics.metrics().size()); |
| } |
| |
| @Test |
| public void testRemoveInactiveMetrics() { |
| Sensor s1 = metrics.sensor("test.s1", null, 1); |
| s1.add(metrics.metricName("test.s1.count", "grp1"), new Count()); |
| |
| Sensor s2 = metrics.sensor("test.s2", null, 3); |
| s2.add(metrics.metricName("test.s2.count", "grp1"), new Count()); |
| |
| Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask(); |
| purger.run(); |
| assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1")); |
| assertNotNull("MetricName test.s1.count must be present", |
| metrics.metrics().get(metrics.metricName("test.s1.count", "grp1"))); |
| assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2")); |
| assertNotNull("MetricName test.s2.count must be present", |
| metrics.metrics().get(metrics.metricName("test.s2.count", "grp1"))); |
| |
| time.sleep(1001); |
| purger.run(); |
| assertNull("Sensor test.s1 should have been purged", metrics.getSensor("test.s1")); |
| assertNull("MetricName test.s1.count should have been purged", |
| metrics.metrics().get(metrics.metricName("test.s1.count", "grp1"))); |
| assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2")); |
| assertNotNull("MetricName test.s2.count must be present", |
| metrics.metrics().get(metrics.metricName("test.s2.count", "grp1"))); |
| |
| // record a value in sensor s2. This should reset the clock for that sensor. |
| // It should not get purged at the 3 second mark after creation |
| s2.record(); |
| time.sleep(2000); |
| purger.run(); |
| assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2")); |
| assertNotNull("MetricName test.s2.count must be present", |
| metrics.metrics().get(metrics.metricName("test.s2.count", "grp1"))); |
| |
| // After another 1 second sleep, the metric should be purged |
| time.sleep(1000); |
| purger.run(); |
| assertNull("Sensor test.s2 should have been purged", metrics.getSensor("test.s1")); |
| assertNull("MetricName test.s2.count should have been purged", |
| metrics.metrics().get(metrics.metricName("test.s1.count", "grp1"))); |
| |
| // After purging, it should be possible to recreate a metric |
| s1 = metrics.sensor("test.s1", null, 1); |
| s1.add(metrics.metricName("test.s1.count", "grp1"), new Count()); |
| assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1")); |
| assertNotNull("MetricName test.s1.count must be present", |
| metrics.metrics().get(metrics.metricName("test.s1.count", "grp1"))); |
| } |
| |
| @Test |
| public void testRemoveMetric() { |
| int size = metrics.metrics().size(); |
| metrics.addMetric(metrics.metricName("test1", "grp1"), new Count()); |
| metrics.addMetric(metrics.metricName("test2", "grp1"), new Count()); |
| |
| assertNotNull(metrics.removeMetric(metrics.metricName("test1", "grp1"))); |
| assertNull(metrics.metrics().get(metrics.metricName("test1", "grp1"))); |
| assertNotNull(metrics.metrics().get(metrics.metricName("test2", "grp1"))); |
| |
| assertNotNull(metrics.removeMetric(metrics.metricName("test2", "grp1"))); |
| assertNull(metrics.metrics().get(metrics.metricName("test2", "grp1"))); |
| |
| assertEquals(size, metrics.metrics().size()); |
| } |
| |
| @Test |
| public void testEventWindowing() { |
| Count count = new Count(); |
| MetricConfig config = new MetricConfig().eventWindow(1).samples(2); |
| count.record(config, 1.0, time.milliseconds()); |
| count.record(config, 1.0, time.milliseconds()); |
| assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); |
| count.record(config, 1.0, time.milliseconds()); // first event times out |
| assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); |
| } |
| |
| @Test |
| public void testTimeWindowing() { |
| Count count = new Count(); |
| MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2); |
| count.record(config, 1.0, time.milliseconds()); |
| time.sleep(1); |
| count.record(config, 1.0, time.milliseconds()); |
| assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); |
| time.sleep(1); |
| count.record(config, 1.0, time.milliseconds()); // oldest event times out |
| assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); |
| } |
| |
| @Test |
| public void testOldDataHasNoEffect() { |
| Max max = new Max(); |
| long windowMs = 100; |
| int samples = 2; |
| MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples); |
| max.record(config, 50, time.milliseconds()); |
| time.sleep(samples * windowMs); |
| assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS); |
| } |
| |
| @Test |
| public void testSampledStatInitialValue() { |
| // initialValue from each SampledStat is set as the initialValue on its Sample. |
| // The only way to test the initialValue is to infer it by having a SampledStat |
| // with expired Stats, because their values are reset to the initial values. |
| // Most implementations of combine on SampledStat end up returning the default |
| // value, so we can use this. This doesn't work for Percentiles though. |
| // This test looks a lot like testOldDataHasNoEffect because it's the same |
| // flow that leads to this state. |
| Max max = new Max(); |
| Min min = new Min(); |
| Avg avg = new Avg(); |
| Count count = new Count(); |
| Rate.SampledTotal sampledTotal = new Rate.SampledTotal(); |
| |
| long windowMs = 100; |
| int samples = 2; |
| MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples); |
| max.record(config, 50, time.milliseconds()); |
| min.record(config, 50, time.milliseconds()); |
| avg.record(config, 50, time.milliseconds()); |
| count.record(config, 50, time.milliseconds()); |
| sampledTotal.record(config, 50, time.milliseconds()); |
| time.sleep(samples * windowMs); |
| |
| assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS); |
| assertEquals(Double.MAX_VALUE, min.measure(config, time.milliseconds()), EPS); |
| assertEquals(0.0, avg.measure(config, time.milliseconds()), EPS); |
| assertEquals(0, count.measure(config, time.milliseconds()), EPS); |
| assertEquals(0.0, sampledTotal.measure(config, time.milliseconds()), EPS); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testDuplicateMetricName() { |
| metrics.sensor("test").add(metrics.metricName("test", "grp1"), new Avg()); |
| metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new Total()); |
| } |
| |
| @Test |
| public void testQuotas() { |
| Sensor sensor = metrics.sensor("test"); |
| sensor.add(metrics.metricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0))); |
| sensor.add(metrics.metricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0))); |
| sensor.record(5.0); |
| try { |
| sensor.record(1.0); |
| fail("Should have gotten a quota violation."); |
| } catch (QuotaViolationException e) { |
| // this is good |
| } |
| assertEquals(6.0, metrics.metrics().get(metrics.metricName("test1.total", "grp1")).value(), EPS); |
| sensor.record(-6.0); |
| try { |
| sensor.record(-1.0); |
| fail("Should have gotten a quota violation."); |
| } catch (QuotaViolationException e) { |
| // this is good |
| } |
| } |
| |
| @Test |
| public void testQuotasEquality() { |
| final Quota quota1 = Quota.upperBound(10.5); |
| final Quota quota2 = Quota.lowerBound(10.5); |
| |
| assertFalse("Quota with different upper values shouldn't be equal", quota1.equals(quota2)); |
| |
| final Quota quota3 = Quota.lowerBound(10.5); |
| |
| assertTrue("Quota with same upper and bound values should be equal", quota2.equals(quota3)); |
| } |
| |
| @Test |
| public void testPercentiles() { |
| int buckets = 100; |
| Percentiles percs = new Percentiles(4 * buckets, |
| 0.0, |
| 100.0, |
| BucketSizing.CONSTANT, |
| new Percentile(metrics.metricName("test.p25", "grp1"), 25), |
| new Percentile(metrics.metricName("test.p50", "grp1"), 50), |
| new Percentile(metrics.metricName("test.p75", "grp1"), 75)); |
| MetricConfig config = new MetricConfig().eventWindow(50).samples(2); |
| Sensor sensor = metrics.sensor("test", config); |
| sensor.add(percs); |
| Metric p25 = this.metrics.metrics().get(metrics.metricName("test.p25", "grp1")); |
| Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1")); |
| Metric p75 = this.metrics.metrics().get(metrics.metricName("test.p75", "grp1")); |
| |
| // record two windows worth of sequential values |
| for (int i = 0; i < buckets; i++) |
| sensor.record(i); |
| |
| assertEquals(25, p25.value(), 1.0); |
| assertEquals(50, p50.value(), 1.0); |
| assertEquals(75, p75.value(), 1.0); |
| |
| for (int i = 0; i < buckets; i++) |
| sensor.record(0.0); |
| |
| assertEquals(0.0, p25.value(), 1.0); |
| assertEquals(0.0, p50.value(), 1.0); |
| assertEquals(0.0, p75.value(), 1.0); |
| |
| // record two more windows worth of sequential values |
| for (int i = 0; i < buckets; i++) |
| sensor.record(i); |
| |
| assertEquals(25, p25.value(), 1.0); |
| assertEquals(50, p50.value(), 1.0); |
| assertEquals(75, p75.value(), 1.0); |
| } |
| |
| @Test |
| public void testRateWindowing() throws Exception { |
| // Use the default time window. Set 3 samples |
| MetricConfig cfg = new MetricConfig().samples(3); |
| Sensor s = metrics.sensor("test.sensor", cfg); |
| MetricName rateMetricName = metrics.metricName("test.rate", "grp1"); |
| MetricName totalMetricName = metrics.metricName("test.total", "grp1"); |
| s.add(new Meter(TimeUnit.SECONDS, rateMetricName, totalMetricName)); |
| KafkaMetric totalMetric = metrics.metrics().get(metrics.metricName("test.total", "grp1")); |
| |
| int sum = 0; |
| int count = cfg.samples() - 1; |
| // Advance 1 window after every record |
| for (int i = 0; i < count; i++) { |
| s.record(100); |
| sum += 100; |
| time.sleep(cfg.timeWindowMs()); |
| assertEquals(sum, totalMetric.value(), EPS); |
| } |
| |
| // Sleep for half the window. |
| time.sleep(cfg.timeWindowMs() / 2); |
| |
| // prior to any time passing |
| double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0; |
| |
| KafkaMetric rateMetric = metrics.metrics().get(metrics.metricName("test.rate", "grp1")); |
| assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, rateMetric.value(), EPS); |
| assertEquals("Elapsed Time = 75 seconds", elapsedSecs, |
| ((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS); |
| assertEquals(sum, totalMetric.value(), EPS); |
| } |
| |
| public static class ConstantMeasurable implements Measurable { |
| public double value = 0.0; |
| |
| @Override |
| public double measure(MetricConfig config, long now) { |
| return value; |
| } |
| |
| } |
| |
| @Test |
| public void testSimpleRate() { |
| SimpleRate rate = new SimpleRate(); |
| |
| //Given |
| MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.SECONDS).samples(10); |
| |
| //In the first window the rate is a fraction of the whole (1s) window |
| //So when we record 1000 at t0, the rate should be 1000 until the window completes, or more data is recorded. |
| record(rate, config, 1000); |
| assertEquals(1000, measure(rate, config), 0); |
| time.sleep(100); |
| assertEquals(1000, measure(rate, config), 0); // 1000B / 0.1s |
| time.sleep(100); |
| assertEquals(1000, measure(rate, config), 0); // 1000B / 0.2s |
| time.sleep(200); |
| assertEquals(1000, measure(rate, config), 0); // 1000B / 0.4s |
| |
| //In the second (and subsequent) window(s), the rate will be in proportion to the elapsed time |
| //So the rate will degrade over time, as the time between measurement and the initial recording grows. |
| time.sleep(600); |
| assertEquals(1000, measure(rate, config), 0); // 1000B / 1.0s |
| time.sleep(200); |
| assertEquals(1000 / 1.2, measure(rate, config), 0); // 1000B / 1.2s |
| time.sleep(200); |
| assertEquals(1000 / 1.4, measure(rate, config), 0); // 1000B / 1.4s |
| |
| //Adding another value, inside the same window should double the rate |
| record(rate, config, 1000); |
| assertEquals(2000 / 1.4, measure(rate, config), 0); // 2000B / 1.4s |
| |
| //Going over the next window, should not change behaviour |
| time.sleep(1100); |
| assertEquals(2000 / 2.5, measure(rate, config), 0); // 2000B / 2.5s |
| record(rate, config, 1000); |
| assertEquals(3000 / 2.5, measure(rate, config), 0); // 3000B / 2.5s |
| |
| //Sleeping for another 6.5 windows also should be the same |
| time.sleep(6500); |
| assertEquals(3000 / 9, measure(rate, config), 1); // 3000B / 9s |
| record(rate, config, 1000); |
| assertEquals(4000 / 9, measure(rate, config), 1); // 4000B / 9s |
| |
| //Going over the 10 window boundary should cause the first window's values (1000) will be purged. |
| //So the rate is calculated based on the oldest reading, which is inside the second window, at 1.4s |
| time.sleep(1500); |
| assertEquals((4000 - 1000) / (10.5 - 1.4), measure(rate, config), 1); |
| record(rate, config, 1000); |
| assertEquals((5000 - 1000) / (10.5 - 1.4), measure(rate, config), 1); |
| } |
| |
| private void record(Rate rate, MetricConfig config, int value) { |
| rate.record(config, value, time.milliseconds()); |
| } |
| |
| private Double measure(Measurable rate, MetricConfig config) { |
| return rate.measure(config, time.milliseconds()); |
| } |
| |
| @Test |
| public void testMetricInstances() { |
| MetricName n1 = metrics.metricInstance(SampleMetrics.METRIC1, "key1", "value1", "key2", "value2"); |
| Map<String, String> tags = new HashMap<String, String>(); |
| tags.put("key1", "value1"); |
| tags.put("key2", "value2"); |
| MetricName n2 = metrics.metricInstance(SampleMetrics.METRIC2, tags); |
| assertEquals("metric names created in two different ways should be equal", n1, n2); |
| |
| try { |
| metrics.metricInstance(SampleMetrics.METRIC1, "key1"); |
| fail("Creating MetricName with an odd number of keyValue should fail"); |
| } catch (IllegalArgumentException e) { |
| // this is expected |
| } |
| |
| Map<String, String> parentTagsWithValues = new HashMap<>(); |
| parentTagsWithValues.put("parent-tag", "parent-tag-value"); |
| |
| Map<String, String> childTagsWithValues = new HashMap<>(); |
| childTagsWithValues.put("child-tag", "child-tag-value"); |
| |
| try (Metrics inherited = new Metrics(new MetricConfig().tags(parentTagsWithValues), Arrays.asList((MetricsReporter) new JmxReporter()), time, true)) { |
| MetricName inheritedMetric = inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, childTagsWithValues); |
| |
| Map<String, String> filledOutTags = inheritedMetric.tags(); |
| assertEquals("parent-tag should be set properly", filledOutTags.get("parent-tag"), "parent-tag-value"); |
| assertEquals("child-tag should be set properly", filledOutTags.get("child-tag"), "child-tag-value"); |
| |
| try { |
| inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, parentTagsWithValues); |
| fail("Creating MetricName should fail if the child metrics are not defined at runtime"); |
| } catch (IllegalArgumentException e) { |
| // this is expected |
| } |
| |
| try { |
| |
| Map<String, String> runtimeTags = new HashMap<>(); |
| runtimeTags.put("child-tag", "child-tag-value"); |
| runtimeTags.put("tag-not-in-template", "unexpected-value"); |
| |
| inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, runtimeTags); |
| fail("Creating MetricName should fail if there is a tag at runtime that is not in the template"); |
| } catch (IllegalArgumentException e) { |
| // this is expected |
| } |
| } |
| } |
| |
| /** |
| * Verifies that concurrent sensor add, remove, updates and read don't result |
| * in errors or deadlock. |
| */ |
| @Test |
| public void testConcurrentReadUpdate() throws Exception { |
| final Random random = new Random(); |
| final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>(); |
| metrics = new Metrics(new MockTime(10)); |
| SensorCreator sensorCreator = new SensorCreator(metrics); |
| |
| final AtomicBoolean alive = new AtomicBoolean(true); |
| executorService = Executors.newSingleThreadExecutor(); |
| executorService.submit(new ConcurrentMetricOperation(alive, "record", new Runnable() { |
| @Override |
| public void run() { |
| while (alive.get()) { |
| for (Sensor sensor : sensors) { |
| sensor.record(random.nextInt(10000)); |
| } |
| } |
| } |
| })); |
| |
| for (int i = 0; i < 10000; i++) { |
| if (sensors.size() > 5) { |
| Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast(); |
| metrics.removeSensor(sensor.name()); |
| } |
| StatType statType = StatType.forId(random.nextInt(StatType.values().length)); |
| sensors.add(sensorCreator.createSensor(statType, i)); |
| for (Sensor sensor : sensors) { |
| for (KafkaMetric metric : sensor.metrics()) { |
| assertNotNull("Invalid metric value", metric.metricValue()); |
| } |
| } |
| } |
| alive.set(false); |
| } |
| |
| /** |
| * Verifies that concurrent sensor add, remove, updates and read with a metrics reporter |
| * that synchronizes on every reporter method doesn't result in errors or deadlock. |
| */ |
| @Test |
| public void testConcurrentReadUpdateReport() throws Exception { |
| |
| class LockingReporter implements MetricsReporter { |
| Map<MetricName, KafkaMetric> activeMetrics = new HashMap<>(); |
| @Override |
| public synchronized void init(List<KafkaMetric> metrics) { |
| } |
| |
| @Override |
| public synchronized void metricChange(KafkaMetric metric) { |
| activeMetrics.put(metric.metricName(), metric); |
| } |
| |
| @Override |
| public synchronized void metricRemoval(KafkaMetric metric) { |
| activeMetrics.remove(metric.metricName()); |
| } |
| |
| @Override |
| public synchronized void close() { |
| } |
| |
| @Override |
| public void configure(Map<String, ?> configs) { |
| } |
| |
| synchronized void processMetrics() { |
| for (KafkaMetric metric : activeMetrics.values()) { |
| assertNotNull("Invalid metric value", metric.metricValue()); |
| } |
| } |
| } |
| |
| final LockingReporter reporter = new LockingReporter(); |
| this.metrics.close(); |
| this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) reporter), new MockTime(10), true); |
| final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>(); |
| SensorCreator sensorCreator = new SensorCreator(metrics); |
| |
| final Random random = new Random(); |
| final AtomicBoolean alive = new AtomicBoolean(true); |
| executorService = Executors.newFixedThreadPool(3); |
| |
| Future<?> writeFuture = executorService.submit(new ConcurrentMetricOperation(alive, "record", new Runnable() { |
| @Override |
| public void run() { |
| while (alive.get()) { |
| for (Sensor sensor : sensors) { |
| sensor.record(random.nextInt(10000)); |
| } |
| } |
| } |
| })); |
| Future<?> readFuture = executorService.submit(new ConcurrentMetricOperation(alive, "read", new Runnable() { |
| @Override |
| public void run() { |
| while (alive.get()) { |
| for (Sensor sensor : sensors) { |
| for (Metric metric : sensor.metrics()) { |
| assertNotNull("Invalid metric value", metric.metricValue()); |
| } |
| } |
| } |
| } |
| })); |
| Future<?> reportFuture = executorService.submit(new ConcurrentMetricOperation(alive, "report", new Runnable() { |
| @Override |
| public void run() { |
| reporter.processMetrics(); |
| } |
| })); |
| |
| for (int i = 0; i < 10000; i++) { |
| if (sensors.size() > 10) { |
| Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast(); |
| metrics.removeSensor(sensor.name()); |
| } |
| StatType statType = StatType.forId(random.nextInt(StatType.values().length)); |
| sensors.add(sensorCreator.createSensor(statType, i)); |
| } |
| assertFalse("Read failed", readFuture.isDone()); |
| assertFalse("Write failed", writeFuture.isDone()); |
| assertFalse("Report failed", reportFuture.isDone()); |
| |
| alive.set(false); |
| } |
| |
| private class ConcurrentMetricOperation implements Runnable { |
| private final AtomicBoolean alive; |
| private final String opName; |
| private final Runnable op; |
| ConcurrentMetricOperation(AtomicBoolean alive, String opName, Runnable op) { |
| this.alive = alive; |
| this.opName = opName; |
| this.op = op; |
| } |
| public void run() { |
| try { |
| while (alive.get()) { |
| op.run(); |
| } |
| } catch (Throwable t) { |
| log.error("Metric {} failed with exception", opName, t); |
| } |
| } |
| } |
| |
| enum StatType { |
| AVG(0), |
| TOTAL(1), |
| COUNT(2), |
| MAX(3), |
| MIN(4), |
| RATE(5), |
| SIMPLE_RATE(6), |
| SUM(7), |
| VALUE(8), |
| PERCENTILES(9), |
| METER(10); |
| |
| int id; |
| StatType(int id) { |
| this.id = id; |
| } |
| |
| static StatType forId(int id) { |
| for (StatType statType : StatType.values()) { |
| if (statType.id == id) |
| return statType; |
| } |
| return null; |
| } |
| } |
| |
| private static class SensorCreator { |
| |
| private final Metrics metrics; |
| |
| SensorCreator(Metrics metrics) { |
| this.metrics = metrics; |
| } |
| |
| private Sensor createSensor(StatType statType, int index) { |
| Sensor sensor = metrics.sensor("kafka.requests." + index); |
| Map<String, String> tags = Collections.singletonMap("tag", "tag" + index); |
| switch (statType) { |
| case AVG: |
| sensor.add(metrics.metricName("test.metric.avg", "avg", tags), new Avg()); |
| break; |
| case TOTAL: |
| sensor.add(metrics.metricName("test.metric.total", "total", tags), new Total()); |
| break; |
| case COUNT: |
| sensor.add(metrics.metricName("test.metric.count", "count", tags), new Count()); |
| break; |
| case MAX: |
| sensor.add(metrics.metricName("test.metric.max", "max", tags), new Max()); |
| break; |
| case MIN: |
| sensor.add(metrics.metricName("test.metric.min", "min", tags), new Min()); |
| break; |
| case RATE: |
| sensor.add(metrics.metricName("test.metric.rate", "rate", tags), new Rate()); |
| break; |
| case SIMPLE_RATE: |
| sensor.add(metrics.metricName("test.metric.simpleRate", "simpleRate", tags), new SimpleRate()); |
| break; |
| case SUM: |
| sensor.add(metrics.metricName("test.metric.sum", "sum", tags), new Sum()); |
| break; |
| case VALUE: |
| sensor.add(metrics.metricName("test.metric.value", "value", tags), new Value()); |
| break; |
| case PERCENTILES: |
| sensor.add(metrics.metricName("test.metric.percentiles", "percentiles", tags), |
| new Percentiles(100, -100, 100, Percentiles.BucketSizing.CONSTANT, |
| new Percentile(metrics.metricName("test.median", "percentiles"), 50.0), |
| new Percentile(metrics.metricName("test.perc99_9", "percentiles"), 99.9))); |
| break; |
| case METER: |
| sensor.add(new Meter(metrics.metricName("test.metric.meter.rate", "meter", tags), |
| metrics.metricName("test.metric.meter.total", "meter", tags))); |
| break; |
| default: |
| throw new IllegalStateException("Invalid stat type " + statType); |
| } |
| return sensor; |
| } |
| } |
| } |