| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.metrics2.sink; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.net.InetAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.UnknownHostException; |
| import java.nio.charset.StandardCharsets; |
| import java.text.SimpleDateFormat; |
| import java.util.Calendar; |
| import java.util.Date; |
| import java.util.TimeZone; |
| import java.util.regex.Pattern; |
| import org.apache.commons.configuration.SubsetConfiguration; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.metrics2.MetricsException; |
| import org.apache.hadoop.metrics2.MetricsRecord; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.hadoop.metrics2.annotation.Metric; |
| import org.apache.hadoop.metrics2.annotation.Metrics; |
| import org.apache.hadoop.metrics2.annotation.Metric.Type; |
| import org.apache.hadoop.metrics2.impl.ConfigBuilder; |
| import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; |
| import org.apache.hadoop.metrics2.impl.TestMetricsConfig; |
| import org.apache.hadoop.metrics2.lib.MutableGaugeInt; |
| import org.apache.hadoop.metrics2.lib.MutableGaugeLong; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.junit.AfterClass; |
| import org.junit.Rule; |
| import org.junit.rules.TestName; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import static org.junit.Assert.assertTrue; |
| |
| /** |
| * This class is a base class for testing the {@link RollingFileSystemSink} |
| * class in various contexts. It provides the a number of useful utility |
| * methods for classes that extend it. |
| */ |
| public class RollingFileSystemSinkTestBase { |
| protected static final String SINK_PRINCIPAL_KEY = "rfssink.principal"; |
| protected static final String SINK_KEYTAB_FILE_KEY = "rfssink.keytab"; |
| protected static final File ROOT_TEST_DIR = GenericTestUtils.getTestDir( |
| "RollingFileSystemSinkTest"); |
| protected static final SimpleDateFormat DATE_FORMAT = |
| new SimpleDateFormat("yyyyMMddHH"); |
| protected static File methodDir; |
| |
| /** |
| * The name of the current test method. |
| */ |
| @Rule |
| public TestName methodName = new TestName(); |
| |
| /** |
| * A sample metric class |
| */ |
| @Metrics(name="testRecord1", context="test1") |
| protected class MyMetrics1 { |
| @Metric(value={"testTag1", ""}, type=Type.TAG) |
| String testTag1() { return "testTagValue1"; } |
| |
| @Metric(value={"testTag2", ""}, type=Type.TAG) |
| String gettestTag2() { return "testTagValue2"; } |
| |
| @Metric(value={"testMetric1", "An integer gauge"}, always=true) |
| MutableGaugeInt testMetric1; |
| |
| @Metric(value={"testMetric2", "A long gauge"}, always=true) |
| MutableGaugeLong testMetric2; |
| |
| public MyMetrics1 registerWith(MetricsSystem ms) { |
| return ms.register(methodName.getMethodName() + "-m1", null, this); |
| } |
| } |
| |
| /** |
| * Another sample metrics class |
| */ |
| @Metrics(name="testRecord2", context="test1") |
| protected class MyMetrics2 { |
| @Metric(value={"testTag22", ""}, type=Type.TAG) |
| String testTag1() { return "testTagValue22"; } |
| |
| public MyMetrics2 registerWith(MetricsSystem ms) { |
| return ms.register(methodName.getMethodName() + "-m2", null, this); |
| } |
| } |
| |
| /** |
| * Set the date format's timezone to GMT. |
| */ |
| @BeforeClass |
| public static void setup() { |
| DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT")); |
| FileUtil.fullyDelete(ROOT_TEST_DIR); |
| } |
| |
| /** |
| * Delete the test directory for this test. |
| * @throws IOException thrown if the delete fails |
| */ |
| @AfterClass |
| public static void deleteBaseDir() throws IOException { |
| FileUtil.fullyDelete(ROOT_TEST_DIR); |
| } |
| |
| /** |
| * Create the test directory for this test. |
| * @throws IOException thrown if the create fails |
| */ |
| @Before |
| public void createMethodDir() throws IOException { |
| methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName()); |
| |
| assertTrue("Test directory already exists: " + methodDir, |
| methodDir.mkdirs()); |
| } |
| |
| /** |
| * Set up the metrics system, start it, and return it. The principal and |
| * keytab properties will not be set. |
| * |
| * @param path the base path for the sink |
| * @param ignoreErrors whether the sink should ignore errors |
| * @param allowAppend whether the sink is allowed to append to existing files |
| * @return the metrics system |
| */ |
| protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors, |
| boolean allowAppend) { |
| return initMetricsSystem(path, ignoreErrors, allowAppend, false); |
| } |
| |
| /** |
| * Set up the metrics system, start it, and return it. |
| * @param path the base path for the sink |
| * @param ignoreErrors whether the sink should ignore errors |
| * @param allowAppend whether the sink is allowed to append to existing files |
| * @param useSecureParams whether to set the principal and keytab properties |
| * @return the org.apache.hadoop.metrics2.MetricsSystem |
| */ |
| protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors, |
| boolean allowAppend, boolean useSecureParams) { |
| // If the prefix is not lower case, the metrics system won't be able to |
| // read any of the properties. |
| String prefix = methodName.getMethodName().toLowerCase(); |
| |
| ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000) |
| .add(prefix + ".sink.mysink0.class", MockSink.class.getName()) |
| .add(prefix + ".sink.mysink0.basepath", path) |
| .add(prefix + ".sink.mysink0.source", "testsrc") |
| .add(prefix + ".sink.mysink0.context", "test1") |
| .add(prefix + ".sink.mysink0.ignore-error", ignoreErrors) |
| .add(prefix + ".sink.mysink0.allow-append", allowAppend) |
| .add(prefix + ".sink.mysink0.roll-offset-interval-millis", 0) |
| .add(prefix + ".sink.mysink0.roll-interval", "1h"); |
| |
| if (useSecureParams) { |
| builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY) |
| .add(prefix + ".sink.mysink0.principal-key", SINK_PRINCIPAL_KEY); |
| } |
| |
| builder.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix)); |
| |
| MetricsSystemImpl ms = new MetricsSystemImpl(prefix); |
| |
| ms.start(); |
| |
| return ms; |
| } |
| |
| /** |
| * Helper method that writes metrics files to a target path, reads those |
| * files, and returns the contents of all files as a single string. This |
| * method will assert that the correct number of files is found. |
| * |
| * @param ms an initialized MetricsSystem to use |
| * @param path the target path from which to read the logs |
| * @param count the number of log files to expect |
| * @return the contents of the log files |
| * @throws IOException when the log file can't be read |
| * @throws URISyntaxException when the target path is an invalid URL |
| */ |
| protected String doWriteTest(MetricsSystem ms, String path, int count) |
| throws IOException, URISyntaxException { |
| final String then = DATE_FORMAT.format(new Date()) + "00"; |
| |
| MyMetrics1 mm1 = new MyMetrics1().registerWith(ms); |
| new MyMetrics2().registerWith(ms); |
| |
| mm1.testMetric1.incr(); |
| mm1.testMetric2.incr(2); |
| |
| ms.publishMetricsNow(); // publish the metrics |
| |
| try { |
| ms.stop(); |
| } finally { |
| ms.shutdown(); |
| } |
| |
| return readLogFile(path, then, count); |
| } |
| |
| /** |
| * Read the log files at the target path and return the contents as a single |
| * string. This method will assert that the correct number of files is found. |
| * |
| * @param path the target path |
| * @param then when the test method began. Used to find the log directory in |
| * the case that the test run crosses the top of the hour. |
| * @param count the number of log files to expect |
| * @return |
| * @throws IOException |
| * @throws URISyntaxException |
| */ |
| protected String readLogFile(String path, String then, int count) |
| throws IOException, URISyntaxException { |
| final String now = DATE_FORMAT.format(new Date()) + "00"; |
| final String logFile = getLogFilename(); |
| FileSystem fs = FileSystem.get(new URI(path), new Configuration()); |
| StringBuilder metrics = new StringBuilder(); |
| boolean found = false; |
| |
| for (FileStatus status : fs.listStatus(new Path(path))) { |
| Path logDir = status.getPath(); |
| |
| // There are only two possible valid log directory names: the time when |
| // the test started and the current time. Anything else can be ignored. |
| if (now.equals(logDir.getName()) || then.equals(logDir.getName())) { |
| readLogData(fs, findMostRecentLogFile(fs, new Path(logDir, logFile)), |
| metrics); |
| assertFileCount(fs, logDir, count); |
| found = true; |
| } |
| } |
| |
| assertTrue("No valid log directories found", found); |
| |
| return metrics.toString(); |
| } |
| |
| /** |
| * Read the target log file and append its contents to the StringBuilder. |
| * @param fs the target FileSystem |
| * @param logFile the target file path |
| * @param metrics where to append the file contents |
| * @throws IOException thrown if the file cannot be read |
| */ |
| protected void readLogData(FileSystem fs, Path logFile, StringBuilder metrics) |
| throws IOException { |
| FSDataInputStream fsin = fs.open(logFile); |
| BufferedReader in = new BufferedReader(new InputStreamReader(fsin, |
| StandardCharsets.UTF_8)); |
| String line = null; |
| |
| while ((line = in.readLine()) != null) { |
| metrics.append(line).append("\n"); |
| } |
| } |
| |
| /** |
| * Return the path to the log file to use, based on the initial path. The |
| * initial path must be a valid log file path. This method will find the |
| * most recent version of the file. |
| * |
| * @param fs the target FileSystem |
| * @param initial the path from which to start |
| * @return the path to use |
| * @throws IOException thrown if testing for file existence fails. |
| */ |
| protected Path findMostRecentLogFile(FileSystem fs, Path initial) |
| throws IOException { |
| Path logFile = null; |
| Path nextLogFile = initial; |
| int id = 1; |
| |
| do { |
| logFile = nextLogFile; |
| nextLogFile = new Path(initial.toString() + "." + id); |
| id += 1; |
| } while (fs.exists(nextLogFile)); |
| |
| return logFile; |
| } |
| |
| /** |
| * Return the name of the log file for this host. |
| * |
| * @return the name of the log file for this host |
| */ |
| protected static String getLogFilename() throws UnknownHostException { |
| return "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log"; |
| } |
| |
| /** |
| * Assert that the given contents match what is expected from the test |
| * metrics. |
| * |
| * @param contents the file contents to test |
| */ |
| protected void assertMetricsContents(String contents) { |
| // Note that in the below expression we allow tags and metrics to go in |
| // arbitrary order, but the records must be in order. |
| final Pattern expectedContentPattern = Pattern.compile( |
| "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+" |
| + "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|" |
| + "testTag2=testTagValue2,\\s+testTag1=testTagValue1)," |
| + "\\s+Hostname=.*,\\s+" |
| + "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)" |
| + "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1," |
| + "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*", |
| Pattern.MULTILINE); |
| |
| assertTrue("Sink did not produce the expected output. Actual output was: " |
| + contents, expectedContentPattern.matcher(contents).matches()); |
| } |
| |
| /** |
| * Assert that the given contents match what is expected from the test |
| * metrics when there is pre-existing data. |
| * |
| * @param contents the file contents to test |
| */ |
| protected void assertExtraContents(String contents) { |
| // Note that in the below expression we allow tags and metrics to go in |
| // arbitrary order, but the records must be in order. |
| final Pattern expectedContentPattern = Pattern.compile( |
| "Extra stuff[\\n\\r]*" |
| + "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+" |
| + "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|" |
| + "testTag2=testTagValue2,\\s+testTag1=testTagValue1)," |
| + "\\s+Hostname=.*,\\s+" |
| + "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)" |
| + "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1," |
| + "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*", |
| Pattern.MULTILINE); |
| |
| assertTrue("Sink did not produce the expected output. Actual output was: " |
| + contents, expectedContentPattern.matcher(contents).matches()); |
| } |
| |
| /** |
| * Call {@link #doWriteTest} after pre-creating the log file and filling it |
| * with junk data. |
| * |
| * @param path the base path for the test |
| * @param ignoreErrors whether to ignore errors |
| * @param allowAppend whether to allow appends |
| * @param count the number of files to expect |
| * @return the contents of the final log file |
| * @throws IOException if a file system operation fails |
| * @throws InterruptedException if interrupted while calling |
| * {@link #getNowNotTopOfHour()} |
| * @throws URISyntaxException if the path is not a valid URI |
| */ |
| protected String doAppendTest(String path, boolean ignoreErrors, |
| boolean allowAppend, int count) |
| throws IOException, InterruptedException, URISyntaxException { |
| preCreateLogFile(path); |
| |
| return doWriteTest(initMetricsSystem(path, ignoreErrors, allowAppend), |
| path, count); |
| } |
| |
| /** |
| * Create a file at the target path with some known data in it: |
| * "Extra stuff". |
| * |
| * If the test run is happening within 20 seconds of the top of the hour, |
| * this method will sleep until the top of the hour. |
| * |
| * @param path the target path under which to create the directory for the |
| * current hour that will contain the log file. |
| * |
| * @throws IOException thrown if the file creation fails |
| * @throws InterruptedException thrown if interrupted while waiting for the |
| * top of the hour. |
| * @throws URISyntaxException thrown if the path isn't a valid URI |
| */ |
| protected void preCreateLogFile(String path) |
| throws IOException, InterruptedException, URISyntaxException { |
| preCreateLogFile(path, 1); |
| } |
| |
| /** |
| * Create files at the target path with some known data in them. Each file |
| * will have the same content: "Extra stuff". |
| * |
| * If the test run is happening within 20 seconds of the top of the hour, |
| * this method will sleep until the top of the hour. |
| * |
| * @param path the target path under which to create the directory for the |
| * current hour that will contain the log files. |
| * @param numFiles the number of log files to create |
| * @throws IOException thrown if the file creation fails |
| * @throws InterruptedException thrown if interrupted while waiting for the |
| * top of the hour. |
| * @throws URISyntaxException thrown if the path isn't a valid URI |
| */ |
| protected void preCreateLogFile(String path, int numFiles) |
| throws IOException, InterruptedException, URISyntaxException { |
| Calendar now = getNowNotTopOfHour(); |
| |
| FileSystem fs = FileSystem.get(new URI(path), new Configuration()); |
| Path dir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00"); |
| |
| fs.mkdirs(dir); |
| |
| Path file = new Path(dir, getLogFilename()); |
| |
| // Create the log file to force the sink to append |
| try (FSDataOutputStream out = fs.create(file)) { |
| out.write("Extra stuff\n".getBytes()); |
| out.flush(); |
| } |
| |
| if (numFiles > 1) { |
| int count = 1; |
| |
| while (count < numFiles) { |
| file = new Path(dir, getLogFilename() + "." + count); |
| |
| // Create the log file to force the sink to append |
| try (FSDataOutputStream out = fs.create(file)) { |
| out.write("Extra stuff\n".getBytes()); |
| out.flush(); |
| } |
| |
| count += 1; |
| } |
| } |
| } |
| |
| /** |
| * Return a calendar based on the current time. If the current time is very |
| * near the top of the hour (less than 20 seconds), sleep until the new hour |
| * before returning a new Calendar instance. |
| * |
| * @return a new Calendar instance that isn't near the top of the hour |
| * @throws InterruptedException if interrupted while sleeping |
| */ |
| public Calendar getNowNotTopOfHour() throws InterruptedException { |
| Calendar now = Calendar.getInstance(TimeZone.getTimeZone("GMT")); |
| |
| // If we're at the very top of the hour, sleep until the next hour |
| // so that we don't get confused by the directory rolling |
| if ((now.get(Calendar.MINUTE) == 59) && (now.get(Calendar.SECOND) > 40)) { |
| Thread.sleep((61 - now.get(Calendar.SECOND)) * 1000L); |
| now.setTime(new Date()); |
| } |
| |
| return now; |
| } |
| |
| /** |
| * Assert that the number of log files in the target directory is as expected. |
| * @param fs the target FileSystem |
| * @param dir the target directory path |
| * @param expected the expected number of files |
| * @throws IOException thrown if listing files fails |
| */ |
| public void assertFileCount(FileSystem fs, Path dir, int expected) |
| throws IOException { |
| RemoteIterator<LocatedFileStatus> i = fs.listFiles(dir, true); |
| int count = 0; |
| |
| while (i.hasNext()) { |
| i.next(); |
| count++; |
| } |
| |
| assertTrue("The sink created additional unexpected log files. " + count |
| + " files were created", expected >= count); |
| assertTrue("The sink created too few log files. " + count + " files were " |
| + "created", expected <= count); |
| } |
| |
| /** |
| * This class is a {@link RollingFileSystemSink} wrapper that tracks whether |
| * an exception has been thrown during operations. |
| */ |
| public static class MockSink extends RollingFileSystemSink { |
| public static volatile boolean errored = false; |
| public static volatile boolean initialized = false; |
| |
| @Override |
| public void init(SubsetConfiguration conf) { |
| try { |
| super.init(conf); |
| } catch (MetricsException ex) { |
| errored = true; |
| |
| throw new MetricsException(ex); |
| } |
| |
| initialized = true; |
| } |
| |
| @Override |
| public void putMetrics(MetricsRecord record) { |
| try { |
| super.putMetrics(record); |
| } catch (MetricsException ex) { |
| errored = true; |
| |
| throw new MetricsException(ex); |
| } |
| } |
| |
| @Override |
| public void close() { |
| try { |
| super.close(); |
| } catch (MetricsException ex) { |
| errored = true; |
| |
| throw new MetricsException(ex); |
| } |
| } |
| |
| @Override |
| public void flush() { |
| try { |
| super.flush(); |
| } catch (MetricsException ex) { |
| errored = true; |
| |
| throw new MetricsException(ex); |
| } |
| } |
| } |
| } |