blob: 5d76a2110b98c1abdfeca7f06737165d1227e541 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import static java.util.stream.Collectors.toMap;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.gc.metrics.GcMetrics;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.metrics.MetricsFileTailer;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Functional test that uses a hadoop metrics 2 file sink to read published metrics for
* verification.
*/
public class GcMetricsIT extends ConfigurableMacBase {
private static final Logger log = LoggerFactory.getLogger(GcMetricsIT.class);
private static final int NUM_TAIL_ATTEMPTS = 20;
private static final long TAIL_DELAY = 5_000;
private static final Pattern metricLinePattern = Pattern.compile("^(?<timestamp>\\d+).*");
private static final String[] EXPECTED_METRIC_KEYS = new String[] {"AccGcCandidates",
"AccGcDeleted", "AccGcErrors", "AccGcFinished", "AccGcInUse", "AccGcPostOpDuration",
"AccGcRunCycleCount", "AccGcStarted", "AccGcWalCandidates", "AccGcWalDeleted",
"AccGcWalErrors", "AccGcWalFinished", "AccGcWalInUse", "AccGcWalStarted"};
@Override
protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.GC_METRICS_ENABLED, "true");
cfg.setProperty(Property.GC_CYCLE_START, "5s");
cfg.setProperty(Property.GC_CYCLE_DELAY, "15s");
}
@Override
protected int defaultTimeoutSeconds() {
return 4 * 60;
}
private final MetricsFileTailer gcTail = new MetricsFileTailer("accumulo.sink.file-gc");
@Before
public void startTailer() {
gcTail.startDaemonThread();
}
@After
public void stopTailer() {
gcTail.close();
}
@Test
public void gcMetricsPublished() throws Exception {
assumeTrue("gc metrics are disabled with GC_METRICS_ENABLED=false",
cluster.getSiteConfiguration().getBoolean(Property.GC_METRICS_ENABLED));
// uncomment for manual jmx / jconsole validation - not for automated testing
// Thread.sleep(320_000);
long testStart = System.currentTimeMillis();
log.debug("Test started: {}", testStart);
LineUpdate firstGc = captureMetricsAfterTimestamp(testStart);
log.debug("First captured metrics finished at {}", firstGc.gcWalFinished);
LineUpdate secondGc = captureMetricsAfterTimestamp(firstGc.gcWalFinished);
log.debug("Second captured metrics finished at {}", secondGc.gcWalFinished);
firstGc.compareWithSubsequentRun(secondGc);
}
private static class LineUpdate {
private final long gcStarted;
private final long gcFinished;
private final long gcWalStarted;
private final long gcWalFinished;
private final long gcRunCycleCount;
private final Map<String,Long> values;
LineUpdate(String line) {
values = parseLine(line);
gcStarted = values.get("AccGcStarted");
gcFinished = values.get("AccGcFinished");
gcWalStarted = values.get("AccGcWalStarted");
gcWalFinished = values.get("AccGcWalFinished");
gcRunCycleCount = values.get("AccGcRunCycleCount");
// ensure internal consistency
assertTrue(Stream.of(EXPECTED_METRIC_KEYS).allMatch(values::containsKey));
assertTrue(gcStarted <= gcFinished);
assertTrue(gcWalStarted <= gcWalFinished);
}
void compareWithSubsequentRun(LineUpdate update) {
log.debug("First run: {}, Second run: {}", values, update.values);
assertTrue("first gc should finish before second starts", gcFinished < update.gcStarted);
assertTrue("first gcWal should finish before second starts",
gcWalFinished < update.gcWalStarted);
assertTrue("cycle count should increment", gcRunCycleCount < update.gcRunCycleCount);
}
/**
* The hadoop metrics file sink published records as a line with comma separated key=value
* pairs. This method parses the line and extracts the key, value pair from metrics that start
* with AccGc and returns them in a sorted map.
*
* @return a map of the metrics that start with AccGc
*/
private static Map<String,Long> parseLine(String line) {
log.debug("Line received:{}", line);
Map<String,Long> values = Collections.emptyMap();
if (line != null) {
values = Stream.of(line.split(",")).map(String::trim)
.filter(s -> s.startsWith(GcMetrics.GC_METRIC_PREFIX)).map(s -> s.split("="))
.collect(toMap(a -> a[0], a -> Long.parseLong(a[1]), (a, b) -> {
throw new IllegalArgumentException("Metric field found with two values '" + a
+ "' and '" + b + "' in GC Metrics line: " + line);
}, TreeMap::new));
}
log.debug("Mapped values:{}", values);
return values;
}
}
private LineUpdate captureMetricsAfterTimestamp(final long timestamp) throws Exception {
for (int count = 0; count < NUM_TAIL_ATTEMPTS; count++) {
String line = gcTail.getLast();
// check for valid metrics line update since the previous update
if (line != null && isValidRecentMetricsLine(line, timestamp)) {
LineUpdate lineUpdate = new LineUpdate(line);
// capture metrics for a regular gc event and a wal gc event that both occurred
// after the specified timestamp, and not just the same entry in the metrics file
// from a subsequent poll that represents the previous gc events;
// note that regular gc events can be updated in the metrics before the subsequent
// wal gc event that follows it, so the wal gc timestamps may be earlier than the
// regular gc timestamps; that's okay, as long as they both occur after the provided
// timestamp, and are internally consistent (start times < finish times)
if (timestamp < lineUpdate.gcStarted && timestamp < lineUpdate.gcWalStarted) {
return lineUpdate;
}
}
Thread.sleep(TAIL_DELAY);
}
throw new IllegalStateException(
String.format("File source update not received after %d tries in %d sec", NUM_TAIL_ATTEMPTS,
TimeUnit.MILLISECONDS.toSeconds(TAIL_DELAY * NUM_TAIL_ATTEMPTS)));
}
private boolean isValidRecentMetricsLine(final String line, final long prevTimestamp) {
if (Objects.isNull(line)) {
return false;
}
Matcher m = metricLinePattern.matcher(line);
if (m.matches()) {
try {
long timestamp = Long.parseLong(m.group("timestamp"));
return timestamp > prevTimestamp;
} catch (NumberFormatException ex) {
log.debug("Could not parse timestamp from line '{}", line);
return false;
}
}
return false;
}
}