| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.metrics2.sink; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.Calendar; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.metrics2.MetricsException; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Test the {@link RollingFileSystemSink} class in the context of HDFS. |
| */ |
| public class TestRollingFileSystemSinkWithHdfs |
| extends RollingFileSystemSinkTestBase { |
| private static final int NUM_DATANODES = 4; |
| private MiniDFSCluster cluster; |
| |
| /** |
| * Create a {@link MiniDFSCluster} instance with four nodes. The |
| * node count is required to allow append to function. Also clear the |
| * sink's test flags. |
| * |
| * @throws IOException thrown if cluster creation fails |
| */ |
| @Before |
| public void setupHdfs() throws IOException { |
| Configuration conf = new Configuration(); |
| |
| // It appears that since HDFS-265, append is always enabled. |
| cluster = |
| new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); |
| |
| // Also clear sink flags |
| RollingFileSystemSink.hasFlushed = false; |
| } |
| |
| /** |
| * Stop the {@link MiniDFSCluster}. |
| */ |
| @After |
| public void shutdownHdfs() { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test writing logs to HDFS. |
| * |
| * @throws Exception thrown when things break |
| */ |
| @Test |
| public void testWrite() throws Exception { |
| String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| MetricsSystem ms = initMetricsSystem(path, false, true); |
| |
| assertMetricsContents(doWriteTest(ms, path, 1)); |
| } |
| |
| /** |
| * Test writing logs to HDFS if append is enabled and the log file already |
| * exists. |
| * |
| * @throws Exception thrown when things break |
| */ |
| @Test |
| public void testAppend() throws Exception { |
| String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| |
| assertExtraContents(doAppendTest(path, false, true, 1)); |
| } |
| |
| /** |
| * Test writing logs to HDFS if append is enabled, the log file already |
| * exists, and the sink is set to ignore errors. |
| * |
| * @throws Exception thrown when things break |
| */ |
| @Test |
| public void testSilentAppend() throws Exception { |
| String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| |
| assertExtraContents(doAppendTest(path, true, true, 1)); |
| } |
| |
| /** |
| * Test writing logs to HDFS without append enabled, when the log file already |
| * exists. |
| * |
| * @throws Exception thrown when things break |
| */ |
| @Test |
| public void testNoAppend() throws Exception { |
| String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| |
| assertMetricsContents(doAppendTest(path, false, false, 2)); |
| } |
| |
| /** |
| * Test writing logs to HDFS without append enabled, with ignore errors |
| * enabled, and when the log file already exists. |
| * |
| * @throws Exception thrown when things break |
| */ |
| @Test |
| public void testSilentOverwrite() throws Exception { |
| String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| |
| assertMetricsContents(doAppendTest(path, true, false, 2)); |
| } |
| |
| /** |
| * Test that writing to HDFS fails when HDFS is unavailable. |
| * |
| * @throws IOException thrown when reading or writing log files |
| */ |
| @Test |
| public void testFailedWrite() throws IOException { |
| final String path = |
| "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| MetricsSystem ms = initMetricsSystem(path, false, false); |
| |
| new MyMetrics1().registerWith(ms); |
| |
| shutdownHdfs(); |
| MockSink.errored = false; |
| |
| ms.publishMetricsNow(); // publish the metrics |
| |
| assertTrue("No exception was generated while writing metrics " |
| + "even though HDFS was unavailable", MockSink.errored); |
| |
| try { |
| ms.stop(); |
| } finally { |
| ms.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that closing a file in HDFS fails when HDFS is unavailable. |
| * |
| * @throws IOException thrown when reading or writing log files |
| */ |
| @Test |
| public void testFailedClose() throws IOException { |
| final String path = |
| "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| MetricsSystem ms = initMetricsSystem(path, false, false); |
| |
| new MyMetrics1().registerWith(ms); |
| |
| ms.publishMetricsNow(); // publish the metrics |
| |
| shutdownHdfs(); |
| MockSink.errored = false; |
| |
| try { |
| ms.stop(); |
| |
| assertTrue("No exception was generated while stopping sink " |
| + "even though HDFS was unavailable", MockSink.errored); |
| } catch (MetricsException ex) { |
| // Expected |
| } finally { |
| ms.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that writing to HDFS fails silently when HDFS is unavailable. |
| * |
| * @throws IOException thrown when reading or writing log files |
| * @throws java.lang.InterruptedException thrown if interrupted |
| */ |
| @Test |
| public void testSilentFailedWrite() throws IOException, InterruptedException { |
| final String path = |
| "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| MetricsSystem ms = initMetricsSystem(path, true, false); |
| |
| new MyMetrics1().registerWith(ms); |
| |
| shutdownHdfs(); |
| MockSink.errored = false; |
| |
| ms.publishMetricsNow(); // publish the metrics |
| |
| assertFalse("An exception was generated writing metrics " |
| + "while HDFS was unavailable, even though the sink is set to " |
| + "ignore errors", MockSink.errored); |
| |
| try { |
| ms.stop(); |
| } finally { |
| ms.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that closing a file in HDFS silently fails when HDFS is unavailable. |
| * |
| * @throws IOException thrown when reading or writing log files |
| */ |
| @Test |
| public void testSilentFailedClose() throws IOException { |
| final String path = |
| "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| MetricsSystem ms = initMetricsSystem(path, true, false); |
| |
| new MyMetrics1().registerWith(ms); |
| |
| ms.publishMetricsNow(); // publish the metrics |
| |
| shutdownHdfs(); |
| MockSink.errored = false; |
| |
| try { |
| ms.stop(); |
| |
| assertFalse("An exception was generated stopping sink " |
| + "while HDFS was unavailable, even though the sink is set to " |
| + "ignore errors", MockSink.errored); |
| } finally { |
| ms.shutdown(); |
| } |
| } |
| |
| /** |
| * This test specifically checks whether the flusher thread is automatically |
| * flushing the files. It unfortunately can only test with the alternative |
| * flushing schedule (because of test timing), but it's better than nothing. |
| * |
| * @throws Exception thrown if something breaks |
| */ |
| @Test |
| public void testFlushThread() throws Exception { |
| // Cause the sink's flush thread to be run immediately after the second |
| // metrics log is written |
| RollingFileSystemSink.forceFlush = true; |
| |
| String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| MetricsSystem ms = initMetricsSystem(path, true, false, false); |
| |
| new MyMetrics1().registerWith(ms); |
| |
| // Publish the metrics |
| ms.publishMetricsNow(); |
| // Pubish again because the first write seems to get properly flushed |
| // regardless. |
| ms.publishMetricsNow(); |
| |
| int count = 0; |
| |
| try { |
| // Sleep until the flusher has run. This should never actually need to |
| // sleep, but the sleep is here to make sure this test isn't flakey. |
| while (!RollingFileSystemSink.hasFlushed) { |
| Thread.sleep(10L); |
| |
| if (++count > 1000) { |
| fail("Flush thread did not run within 10 seconds"); |
| } |
| } |
| |
| Calendar now = Calendar.getInstance(); |
| Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00"); |
| FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration()); |
| Path currentFile = |
| findMostRecentLogFile(fs, new Path(currentDir, getLogFilename())); |
| FileStatus status = fs.getFileStatus(currentFile); |
| |
| // Each metrics record is 118+ bytes, depending on hostname |
| assertTrue("The flusher thread didn't flush the log contents. Expected " |
| + "at least 236 bytes in the log file, but got " + status.getLen(), |
| status.getLen() >= 236); |
| } finally { |
| RollingFileSystemSink.forceFlush = false; |
| |
| try { |
| ms.stop(); |
| } finally { |
| ms.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Test that a failure to connect to HDFS does not cause the init() method |
| * to fail. |
| */ |
| @Test |
| public void testInitWithNoHDFS() { |
| String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; |
| |
| shutdownHdfs(); |
| MockSink.errored = false; |
| initMetricsSystem(path, true, false); |
| |
| assertTrue("The sink was not initialized as expected", |
| MockSink.initialized); |
| assertFalse("The sink threw an unexpected error on initialization", |
| MockSink.errored); |
| } |
| } |