blob: 0dabe468e49e37311dce330cc97dd2b2fc76a9cc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.impl;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.junit.Assert.assertEquals;
import org.apache.log4j.Logger;
import org.junit.Test;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
public class TestMetricsSourceAdapter {
private static final int RACE_TEST_RUNTIME = 10000; // 10 seconds
@Test
public void testPurgeOldMetrics() throws Exception {
// create test source with a single metric counter of value 1
PurgableSource source = new PurgableSource();
MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(source);
final MetricsSource s = sb.build();
List<MetricsTag> injectedTags = new ArrayList<MetricsTag>();
MetricsSourceAdapter sa = new MetricsSourceAdapter(
"tst", "tst", "testdesc", s, injectedTags, null, null, 1, false);
MBeanInfo info = sa.getMBeanInfo();
boolean sawIt = false;
for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
sawIt |= mBeanAttributeInfo.getName().equals(source.lastKeyName);
};
assertTrue("The last generated metric is not exported to jmx", sawIt);
Thread.sleep(1000); // skip JMX cache TTL
info = sa.getMBeanInfo();
sawIt = false;
for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
sawIt |= mBeanAttributeInfo.getName().equals(source.lastKeyName);
};
assertTrue("The last generated metric is not exported to jmx", sawIt);
}
//generate a new key per each call
private static class PurgableSource implements MetricsSource {
int nextKey = 0;
String lastKeyName = null;
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder rb =
collector.addRecord("purgablesource")
.setContext("test");
lastKeyName = "key" + nextKey++;
rb.addGauge(info(lastKeyName, "desc"), 1);
}
}
@Test
public void testGetMetricsAndJmx() throws Exception {
// create test source with a single metric counter of value 0
TestSource source = new TestSource("test");
MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(source);
final MetricsSource s = sb.build();
List<MetricsTag> injectedTags = new ArrayList<MetricsTag>();
MetricsSourceAdapter sa = new MetricsSourceAdapter(
"test", "test", "test desc", s, injectedTags, null, null, 1, false);
// all metrics are initially assumed to have changed
MetricsCollectorImpl builder = new MetricsCollectorImpl();
Iterable<MetricsRecordImpl> metricsRecords = sa.getMetrics(builder, true);
// Validate getMetrics and JMX initial values
MetricsRecordImpl metricsRecord = metricsRecords.iterator().next();
assertEquals(0L,
metricsRecord.metrics().iterator().next().value().longValue());
Thread.sleep(100); // skip JMX cache TTL
assertEquals(0L, (Number)sa.getAttribute("C1"));
// change metric value
source.incrementCnt();
// validate getMetrics and JMX
builder = new MetricsCollectorImpl();
metricsRecords = sa.getMetrics(builder, true);
metricsRecord = metricsRecords.iterator().next();
assertTrue(metricsRecord.metrics().iterator().hasNext());
Thread.sleep(100); // skip JMX cache TTL
assertEquals(1L, (Number)sa.getAttribute("C1"));
}
@SuppressWarnings("unused")
@Metrics(context="test")
private static class TestSource {
@Metric("C1 desc") MutableCounterLong c1;
final MetricsRegistry registry;
TestSource(String recName) {
registry = new MetricsRegistry(recName);
}
public void incrementCnt() {
c1.incr();
}
}
/**
* Test a race condition when updating the JMX cache (HADOOP-12482):
* 1. Thread A reads the JMX metric every 2 JMX cache TTL. It marks the JMX
* cache to be updated by marking lastRecs to null. After this it adds a
* new key to the metrics. The next call to read should pick up this new
* key.
* 2. Thread B triggers JMX metric update every 1 JMX cache TTL. It assigns
* lastRecs to a new object (not null any more).
* 3. Thread A tries to read JMX metric again, sees lastRecs is not null and
* does not update JMX cache. As a result the read does not pickup the new
* metric.
* @throws Exception
*/
@Test
public void testMetricCacheUpdateRace() throws Exception {
// Create test source with a single metric counter of value 1.
TestMetricsSource source = new TestMetricsSource();
MetricsSourceBuilder sourceBuilder =
MetricsAnnotations.newSourceBuilder(source);
final long JMX_CACHE_TTL = 250; // ms
List<MetricsTag> injectedTags = new ArrayList<>();
MetricsSourceAdapter sourceAdapter =
new MetricsSourceAdapter("test", "test",
"test JMX cache update race condition", sourceBuilder.build(),
injectedTags, null, null, JMX_CACHE_TTL, false);
ScheduledExecutorService updaterExecutor =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
ScheduledExecutorService readerExecutor =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
final AtomicBoolean hasError = new AtomicBoolean(false);
// Wake up every 1 JMX cache TTL to set lastRecs before updateJmxCache() is
// called.
SourceUpdater srcUpdater = new SourceUpdater(sourceAdapter, hasError);
ScheduledFuture<?> updaterFuture =
updaterExecutor.scheduleAtFixedRate(srcUpdater,
sourceAdapter.getJmxCacheTTL(), sourceAdapter.getJmxCacheTTL(),
TimeUnit.MILLISECONDS);
srcUpdater.setFuture(updaterFuture);
// Wake up every 2 JMX cache TTL so updateJmxCache() will try to update
// JMX cache.
SourceReader srcReader = new SourceReader(source, sourceAdapter, hasError);
ScheduledFuture<?> readerFuture =
readerExecutor.scheduleAtFixedRate(srcReader,
0, // set JMX info cache at the beginning
2 * sourceAdapter.getJmxCacheTTL(), TimeUnit.MILLISECONDS);
srcReader.setFuture(readerFuture);
// Let the threads do their work.
Thread.sleep(RACE_TEST_RUNTIME);
assertFalse("Hit error", hasError.get());
// cleanup
updaterExecutor.shutdownNow();
readerExecutor.shutdownNow();
updaterExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
readerExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
/**
* Thread safe source: stores a key value pair. Allows thread safe key-value
* pair reads/writes.
*/
private static class TestMetricsSource implements MetricsSource {
private String key = "key0";
private int val = 0;
synchronized String getKey() {
return key;
}
synchronized void setKV(final String newKey, final int newVal) {
key = newKey;
val = newVal;
}
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder rb =
collector.addRecord("TestMetricsSource").setContext("test");
synchronized(this) {
rb.addGauge(info(key, "TestMetricsSource key"), val);
}
}
}
/**
* An thread that updates the metrics source every 1 JMX cache TTL
*/
private static class SourceUpdater implements Runnable {
private MetricsSourceAdapter sa = null;
private ScheduledFuture<?> future = null;
private AtomicBoolean hasError = null;
private static final Logger LOG = Logger.getLogger(SourceUpdater.class);
public SourceUpdater(MetricsSourceAdapter sourceAdapter,
AtomicBoolean err) {
sa = sourceAdapter;
hasError = err;
}
public void setFuture(ScheduledFuture<?> f) {
future = f;
}
@Override
public void run() {
MetricsCollectorImpl builder = new MetricsCollectorImpl();
try {
// This resets lastRecs.
sa.getMetrics(builder, true);
LOG.info("reset lastRecs");
} catch (Exception e) {
// catch all errors
hasError.set(true);
LOG.error(e.getStackTrace());
} finally {
if (hasError.get()) {
LOG.error("Hit error, stopping now");
future.cancel(false);
}
}
}
}
/**
* An thread that reads the metrics source every JMX cache TTL. After each
* read it updates the metric source to report a new key. The next read must
* be able to pick up this new key.
*/
private static class SourceReader implements Runnable {
private MetricsSourceAdapter sa = null;
private TestMetricsSource src = null;
private int cnt = 0;
private ScheduledFuture<?> future = null;
private AtomicBoolean hasError = null;
private static final Logger LOG = Logger.getLogger(SourceReader.class);
public SourceReader(
TestMetricsSource source, MetricsSourceAdapter sourceAdapter,
AtomicBoolean err) {
src = source;
sa = sourceAdapter;
hasError = err;
}
public void setFuture(ScheduledFuture<?> f) {
future = f;
}
@Override
public void run() {
try {
// This will trigger updateJmxCache().
MBeanInfo info = sa.getMBeanInfo();
final String key = src.getKey();
for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
// Found the new key, update the metric source and move on.
if (mBeanAttributeInfo.getName().equals(key)) {
LOG.info("found key/val=" + cnt + "/" + cnt);
cnt++;
src.setKV("key" + cnt, cnt);
return;
}
}
LOG.error("key=" + key + " not found. Stopping now.");
hasError.set(true);
} catch (Exception e) {
// catch other errors
hasError.set(true);
LOG.error(e.getStackTrace());
} finally {
if (hasError.get()) {
future.cancel(false);
}
}
}
}
}