blob: 77176e1f5c5d2ee4dbe19283f90249ea166c8b04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.TokenBucket;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class SensorTest {
private static final MetricConfig INFO_CONFIG = new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO);
private static final MetricConfig DEBUG_CONFIG = new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG);
private static final MetricConfig TRACE_CONFIG = new MetricConfig().recordLevel(Sensor.RecordingLevel.TRACE);
@Test
public void testRecordLevelEnum() {
Sensor.RecordingLevel configLevel = Sensor.RecordingLevel.INFO;
assertTrue(Sensor.RecordingLevel.INFO.shouldRecord(configLevel.id));
assertFalse(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
assertFalse(Sensor.RecordingLevel.TRACE.shouldRecord(configLevel.id));
configLevel = Sensor.RecordingLevel.DEBUG;
assertTrue(Sensor.RecordingLevel.INFO.shouldRecord(configLevel.id));
assertTrue(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
assertFalse(Sensor.RecordingLevel.TRACE.shouldRecord(configLevel.id));
configLevel = Sensor.RecordingLevel.TRACE;
assertTrue(Sensor.RecordingLevel.INFO.shouldRecord(configLevel.id));
assertTrue(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
assertTrue(Sensor.RecordingLevel.TRACE.shouldRecord(configLevel.id));
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString()),
Sensor.RecordingLevel.DEBUG);
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString()),
Sensor.RecordingLevel.INFO);
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.TRACE.toString()),
Sensor.RecordingLevel.TRACE);
}
@Test
public void testShouldRecordForInfoLevelSensor() {
Sensor infoSensor = new Sensor(null, "infoSensor", null, INFO_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
infoSensor = new Sensor(null, "infoSensor", null, DEBUG_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
infoSensor = new Sensor(null, "infoSensor", null, TRACE_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
}
@Test
public void testShouldRecordForDebugLevelSensor() {
Sensor debugSensor = new Sensor(null, "debugSensor", null, INFO_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.DEBUG);
assertFalse(debugSensor.shouldRecord());
debugSensor = new Sensor(null, "debugSensor", null, DEBUG_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.DEBUG);
assertTrue(debugSensor.shouldRecord());
debugSensor = new Sensor(null, "debugSensor", null, TRACE_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.DEBUG);
assertTrue(debugSensor.shouldRecord());
}
@Test
public void testShouldRecordForTraceLevelSensor() {
Sensor traceSensor = new Sensor(null, "traceSensor", null, INFO_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.TRACE);
assertFalse(traceSensor.shouldRecord());
traceSensor = new Sensor(null, "traceSensor", null, DEBUG_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.TRACE);
assertFalse(traceSensor.shouldRecord());
traceSensor = new Sensor(null, "traceSensor", null, TRACE_CONFIG, new SystemTime(),
0, Sensor.RecordingLevel.TRACE);
assertTrue(traceSensor.shouldRecord());
}
@Test
public void testExpiredSensor() {
MetricConfig config = new MetricConfig();
Time mockTime = new MockTime();
try (Metrics metrics = new Metrics(config, Arrays.asList(new JmxReporter()), mockTime, true)) {
long inactiveSensorExpirationTimeSeconds = 60L;
Sensor sensor = new Sensor(metrics, "sensor", null, config, mockTime,
inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel.INFO);
assertTrue(sensor.add(metrics.metricName("test1", "grp1"), new Avg()));
Map<String, String> emptyTags = Collections.emptyMap();
MetricName rateMetricName = new MetricName("rate", "test", "", emptyTags);
MetricName totalMetricName = new MetricName("total", "test", "", emptyTags);
Meter meter = new Meter(rateMetricName, totalMetricName);
assertTrue(sensor.add(meter));
mockTime.sleep(TimeUnit.SECONDS.toMillis(inactiveSensorExpirationTimeSeconds + 1));
assertFalse(sensor.add(metrics.metricName("test3", "grp1"), new Avg()));
assertFalse(sensor.add(meter));
}
}
@Test
public void testIdempotentAdd() {
final Metrics metrics = new Metrics();
final Sensor sensor = metrics.sensor("sensor");
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg()));
// adding the same metric to the same sensor is a no-op
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg()));
// but adding the same metric to a DIFFERENT sensor is an error
final Sensor anotherSensor = metrics.sensor("another-sensor");
try {
anotherSensor.add(metrics.metricName("test-metric", "test-group"), new Avg());
fail("should have thrown");
} catch (final IllegalArgumentException ignored) {
// pass
}
// note that adding a different metric with the same name is also a no-op
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new WindowedSum()));
// so after all this, we still just have the original metric registered
assertEquals(1, sensor.metrics().size());
assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass());
}
/**
* The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads.
*/
@Test
public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException {
final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
// decreasing the value of time window make SampledStat always record the given value
.timeWindow(1, TimeUnit.MILLISECONDS)
// increasing the value of samples make SampledStat store more samples
.samples(100));
final Sensor sensor = metrics.sensor("sensor");
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Rate()));
final int threadCount = 10;
final CountDownLatch latch = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(threadCount);
List<Future<Throwable>> workers = new ArrayList<>(threadCount);
boolean needShutdown = true;
try {
for (int i = 0; i != threadCount; ++i) {
final int index = i;
workers.add(service.submit(new Callable<Throwable>() {
@Override
public Throwable call() {
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
for (int j = 0; j != 20; ++j) {
sensor.record(j * index, System.currentTimeMillis() + j, false);
sensor.checkQuotas();
}
return null;
} catch (Throwable e) {
return e;
}
}
}));
}
latch.countDown();
service.shutdown();
assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
needShutdown = false;
for (Future<Throwable> callable : workers) {
assertTrue(callable.isDone(), "If this failure happen frequently, we can try to increase the wait time");
assertNull(callable.get(), "Sensor#checkQuotas SHOULD be thread-safe!");
}
} finally {
if (needShutdown) {
service.shutdownNow();
}
}
}
@Test
public void shouldReturnPresenceOfMetrics() {
final Metrics metrics = new Metrics();
final Sensor sensor = metrics.sensor("sensor");
assertFalse(sensor.hasMetrics());
sensor.add(
new MetricName("name1", "group1", "description1", Collections.emptyMap()),
new WindowedSum()
);
assertTrue(sensor.hasMetrics());
sensor.add(
new MetricName("name2", "group2", "description2", Collections.emptyMap()),
new CumulativeCount()
);
assertTrue(sensor.hasMetrics());
}
@Test
public void testStrictQuotaEnforcementWithRate() {
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
final Metrics metrics = new Metrics(time);
final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
.quota(Quota.upperBound(2))
.timeWindow(1, TimeUnit.SECONDS)
.samples(11));
final MetricName metricName = metrics.metricName("rate", "test-group");
assertTrue(sensor.add(metricName, new Rate()));
final KafkaMetric rateMetric = metrics.metric(metricName);
// Recording a first value at T+0 to bring the avg rate to 3 which is already
// above the quota.
strictRecord(sensor, 30, time.milliseconds());
assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
// Theoretically, we should wait 5s to bring back the avg rate to the define quota:
// ((30 / 10) - 2) / 2 * 10 = 5s
time.sleep(5000);
// But, recording a second value is rejected because the avg rate is still equal
// to 3 after 5s.
assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds()));
metrics.close();
}
@Test
public void testStrictQuotaEnforcementWithTokenBucket() {
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
final Metrics metrics = new Metrics(time);
final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
.quota(Quota.upperBound(2))
.timeWindow(1, TimeUnit.SECONDS)
.samples(10));
final MetricName metricName = metrics.metricName("credits", "test-group");
assertTrue(sensor.add(metricName, new TokenBucket()));
final KafkaMetric tkMetric = metrics.metric(metricName);
// Recording a first value at T+0 to bring the remaining credits below zero
strictRecord(sensor, 30, time.milliseconds());
assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
// Theoretically, we should wait 5s to bring back the avg rate to the define quota:
// 10 / 2 = 5s
time.sleep(5000);
// Unlike the default rate based on a windowed sum, it works as expected.
assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
strictRecord(sensor, 30, time.milliseconds());
assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
metrics.close();
}
private void strictRecord(Sensor sensor, double value, long timeMs) {
synchronized (sensor) {
sensor.checkQuotas(timeMs);
sensor.record(value, timeMs, false);
}
}
@Test
public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() {
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
final Metrics metrics = new Metrics(time);
final Sensor sensor = metrics.sensor("sensor");
final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class);
final MetricName stat1Name = metrics.metricName("stat1", "test-group");
final MetricConfig stat1Config = new MetricConfig().quota(Quota.upperBound(5));
sensor.add(stat1Name, stat1, stat1Config);
final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class);
final MetricName stat2Name = metrics.metricName("stat2", "test-group");
final MetricConfig stat2Config = new MetricConfig().quota(Quota.upperBound(10));
sensor.add(stat2Name, stat2, stat2Config);
sensor.record(10, 1);
Mockito.verify(stat1).record(stat1Config, 10, 1);
Mockito.verify(stat2).record(stat2Config, 10, 1);
sensor.checkQuotas(2);
Mockito.verify(stat1).measure(stat1Config, 2);
Mockito.verify(stat2).measure(stat2Config, 2);
metrics.close();
}
@Test
public void testUpdatingMetricConfigIsReflectedInTheSensor() {
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
final Metrics metrics = new Metrics(time);
final Sensor sensor = metrics.sensor("sensor");
final MeasurableStat stat = Mockito.mock(MeasurableStat.class);
final MetricName statName = metrics.metricName("stat", "test-group");
final MetricConfig statConfig = new MetricConfig().quota(Quota.upperBound(5));
sensor.add(statName, stat, statConfig);
sensor.record(10, 1);
Mockito.verify(stat).record(statConfig, 10, 1);
sensor.checkQuotas(2);
Mockito.verify(stat).measure(statConfig, 2);
// Update the config of the KafkaMetric
final MetricConfig newConfig = new MetricConfig().quota(Quota.upperBound(10));
metrics.metric(statName).config(newConfig);
sensor.record(10, 3);
Mockito.verify(stat).record(newConfig, 10, 3);
sensor.checkQuotas(4);
Mockito.verify(stat).measure(newConfig, 4);
metrics.close();
}
}