blob: 8e3dfeb3b9fad78244eadea0c8719ce4fd991f7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SensorTest {
@Test
public void testRecordLevelEnum() {
Sensor.RecordingLevel configLevel = Sensor.RecordingLevel.INFO;
assertTrue(Sensor.RecordingLevel.INFO.shouldRecord(configLevel.id));
assertFalse(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
configLevel = Sensor.RecordingLevel.DEBUG;
assertTrue(Sensor.RecordingLevel.INFO.shouldRecord(configLevel.id));
assertTrue(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString()),
Sensor.RecordingLevel.DEBUG);
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString()),
Sensor.RecordingLevel.INFO);
}
@Test
public void testShouldRecord() {
MetricConfig debugConfig = new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG);
MetricConfig infoConfig = new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO);
Sensor infoSensor = new Sensor(null, "infoSensor", null, debugConfig, new SystemTime(),
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
infoSensor = new Sensor(null, "infoSensor", null, debugConfig, new SystemTime(),
0, Sensor.RecordingLevel.DEBUG);
assertTrue(infoSensor.shouldRecord());
Sensor debugSensor = new Sensor(null, "debugSensor", null, infoConfig, new SystemTime(),
0, Sensor.RecordingLevel.INFO);
assertTrue(debugSensor.shouldRecord());
debugSensor = new Sensor(null, "debugSensor", null, infoConfig, new SystemTime(),
0, Sensor.RecordingLevel.DEBUG);
assertFalse(debugSensor.shouldRecord());
}
@Test
public void testExpiredSensor() {
MetricConfig config = new MetricConfig();
Time mockTime = new MockTime();
Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), mockTime, true);
long inactiveSensorExpirationTimeSeconds = 60L;
Sensor sensor = new Sensor(metrics, "sensor", null, config, mockTime,
inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel.INFO);
assertTrue(sensor.add(metrics.metricName("test1", "grp1"), new Avg()));
Map<String, String> emptyTags = Collections.emptyMap();
MetricName rateMetricName = new MetricName("rate", "test", "", emptyTags);
MetricName totalMetricName = new MetricName("total", "test", "", emptyTags);
Meter meter = new Meter(rateMetricName, totalMetricName);
assertTrue(sensor.add(meter));
mockTime.sleep(TimeUnit.SECONDS.toMillis(inactiveSensorExpirationTimeSeconds + 1));
assertFalse(sensor.add(metrics.metricName("test3", "grp1"), new Avg()));
assertFalse(sensor.add(meter));
metrics.close();
}
@Test
public void testIdempotentAdd() {
final Metrics metrics = new Metrics();
final Sensor sensor = metrics.sensor("sensor");
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg()));
// adding the same metric to the same sensor is a no-op
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg()));
// but adding the same metric to a DIFFERENT sensor is an error
final Sensor anotherSensor = metrics.sensor("another-sensor");
try {
anotherSensor.add(metrics.metricName("test-metric", "test-group"), new Avg());
fail("should have thrown");
} catch (final IllegalArgumentException ignored) {
// pass
}
// note that adding a different metric with the same name is also a no-op
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Sum()));
// so after all this, we still just have the original metric registered
assertEquals(1, sensor.metrics().size());
assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass());
}
/**
* The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads.
*/
@Test
public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException {
final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
// decreasing the value of time window make SampledStat always record the given value
.timeWindow(1, TimeUnit.MILLISECONDS)
// increasing the value of samples make SampledStat store more samples
.samples(100));
final Sensor sensor = metrics.sensor("sensor");
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Rate()));
final int threadCount = 10;
final CountDownLatch latch = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(threadCount);
List<Future<Throwable>> workers = new ArrayList<>(threadCount);
boolean needShutdown = true;
try {
for (int i = 0; i != threadCount; ++i) {
final int index = i;
workers.add(service.submit(new Callable<Throwable>() {
@Override
public Throwable call() {
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
for (int j = 0; j != 20; ++j) {
sensor.record(j * index, System.currentTimeMillis() + j, false);
sensor.checkQuotas();
}
return null;
} catch (Throwable e) {
return e;
}
}
}));
}
latch.countDown();
service.shutdown();
assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
needShutdown = false;
for (Future<Throwable> callable : workers) {
assertTrue("If this failure happen frequently, we can try to increase the wait time", callable.isDone());
assertNull("Sensor#checkQuotas SHOULD be thread-safe!", callable.get());
}
} finally {
if (needShutdown) {
service.shutdownNow();
}
}
}
}